@@ -94,6 +94,37 @@ private static void publishToPartitions(
9494 latchAssert (publishLatch ).completes ();
9595 }
9696
97+ static AutoCloseable publishToPartitions (TestUtils .ClientFactory cf , List <String > partitions ) {
98+ Client client = cf .get ();
99+ for (int i = 0 ; i < partitions .size (); i ++) {
100+ assertThat (client .declarePublisher (b (i ), null , partitions .get (i )).isOk ()).isTrue ();
101+ }
102+ Runnable publish =
103+ () -> {
104+ int count = 0 ;
105+ while (!Thread .currentThread ().isInterrupted ()) {
106+ int partitionIndex = count ++ % partitions .size ();
107+ String partition = partitions .get (partitionIndex );
108+ client .publish (
109+ b (partitionIndex ),
110+ Collections .singletonList (
111+ client
112+ .messageBuilder ()
113+ .addData (partition .getBytes (StandardCharsets .UTF_8 ))
114+ .build ()));
115+ try {
116+ Thread .sleep (10 );
117+ } catch (InterruptedException e ) {
118+ Thread .currentThread ().interrupt ();
119+ break ;
120+ }
121+ }
122+ };
123+ Thread thread = new Thread (publish );
124+ thread .start ();
125+ return thread ::interrupt ;
126+ }
127+
97128 @ Test
98129 void consumeAllMessagesFromAllPartitions () {
99130 declareSuperStreamTopology (configurationClient , superStream , partitionCount );
@@ -299,62 +330,62 @@ void autoOffsetTrackingShouldStoreOffsetZero() {
299330 @ BrokerVersionAtLeast (RABBITMQ_3_11_11 )
300331 void rebalancedPartitionShouldGetMessagesWhenItComesBackToOriginalConsumerInstance ()
301332 throws Exception {
333+ Duration timeout = Duration .ofSeconds (60 );
302334 declareSuperStreamTopology (configurationClient , superStream , partitionCount );
303335 Client client = cf .get ();
304336 List <String > partitions = client .partitions (superStream );
305- int messageCount = 10_000 ;
306- publishToPartitions (cf , partitions , messageCount );
307- String consumerName = "my-app" ;
308- Set <String > receivedPartitions = ConcurrentHashMap .newKeySet (partitionCount );
309- Runnable processing =
310- () -> {
311- try {
312- Thread .sleep (10 );
313- } catch (InterruptedException e ) {
314- // OK
315- }
316- };
317- Consumer consumer1 =
318- environment
319- .consumerBuilder ()
320- .superStream (superStream )
321- .singleActiveConsumer ()
322- .offset (OffsetSpecification .first ())
323- .name (consumerName )
324- .autoTrackingStrategy ()
325- .messageCountBeforeStorage (messageCount / partitionCount / 50 )
326- .builder ()
327- .messageHandler (
328- (context , message ) -> {
329- receivedPartitions .add (context .stream ());
330- processing .run ();
331- })
332- .build ();
333- waitAtMost (() -> receivedPartitions .size () == partitions .size (),
334- () -> format ("Expected to receive messages from all partitions, got %s" , receivedPartitions ));
337+ try (AutoCloseable publish = publishToPartitions (cf , partitions )) {
338+ int messageCountBeforeStorage = 10 ;
339+ String consumerName = "my-app" ;
340+ Set <String > receivedPartitions = ConcurrentHashMap .newKeySet (partitionCount );
341+ Consumer consumer1 =
342+ environment
343+ .consumerBuilder ()
344+ .superStream (superStream )
345+ .singleActiveConsumer ()
346+ .offset (OffsetSpecification .first ())
347+ .name (consumerName )
348+ .autoTrackingStrategy ()
349+ .messageCountBeforeStorage (messageCountBeforeStorage )
350+ .builder ()
351+ .messageHandler (
352+ (context , message ) -> {
353+ receivedPartitions .add (context .stream ());
354+ })
355+ .build ();
356+ waitAtMost (
357+ timeout ,
358+ () -> receivedPartitions .size () == partitions .size (),
359+ () ->
360+ format (
361+ "Expected to receive messages from all partitions, got %s" , receivedPartitions ));
335362
336- AtomicReference <String > partition = new AtomicReference <>();
337- Consumer consumer2 =
338- environment
339- .consumerBuilder ()
340- .superStream (superStream )
341- .singleActiveConsumer ()
342- .offset (OffsetSpecification .first ())
343- .name (consumerName )
344- .autoTrackingStrategy ()
345- .messageCountBeforeStorage (messageCount / partitionCount / 50 )
346- .builder ()
347- .messageHandler (
348- (context , message ) -> {
349- partition .set (context .stream ());
350- processing .run ();
351- })
352- .build ();
353- waitAtMost (() -> partition .get () != null );
354- consumer2 .close ();
355- receivedPartitions .clear ();
356- waitAtMost (() -> receivedPartitions .size () == partitions .size (),
357- () -> format ("Expected to receive messages from all partitions, got %s" , receivedPartitions ));
358- consumer1 .close ();
363+ AtomicReference <String > partition = new AtomicReference <>();
364+ Consumer consumer2 =
365+ environment
366+ .consumerBuilder ()
367+ .superStream (superStream )
368+ .singleActiveConsumer ()
369+ .offset (OffsetSpecification .first ())
370+ .name (consumerName )
371+ .autoTrackingStrategy ()
372+ .messageCountBeforeStorage (messageCountBeforeStorage )
373+ .builder ()
374+ .messageHandler (
375+ (context , message ) -> {
376+ partition .set (context .stream ());
377+ })
378+ .build ();
379+ waitAtMost (timeout , () -> partition .get () != null );
380+ consumer2 .close ();
381+ receivedPartitions .clear ();
382+ waitAtMost (
383+ timeout ,
384+ () -> receivedPartitions .size () == partitions .size (),
385+ () ->
386+ format (
387+ "Expected to receive messages from all partitions, got %s" , receivedPartitions ));
388+ consumer1 .close ();
389+ }
359390 }
360391}
0 commit comments