2525import static org .apache .beam .sdk .io .jms .CommonJms .toSerializableFunction ;
2626import static org .apache .beam .sdk .io .jms .JmsIO .Writer .JMS_IO_PRODUCER_METRIC_NAME ;
2727import static org .apache .beam .sdk .io .jms .JmsIO .Writer .PUBLICATION_RETRIES_METRIC_NAME ;
28+ import static org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .util .concurrent .Uninterruptibles .sleepUninterruptibly ;
2829import static org .hamcrest .CoreMatchers .allOf ;
2930import static org .hamcrest .MatcherAssert .assertThat ;
3031import static org .hamcrest .Matchers .contains ;
8687import org .apache .beam .sdk .coders .Coder ;
8788import org .apache .beam .sdk .coders .SerializableCoder ;
8889import org .apache .beam .sdk .coders .StringUtf8Coder ;
90+ import org .apache .beam .sdk .io .UnboundedSource ;
8991import org .apache .beam .sdk .io .UnboundedSource .CheckpointMark ;
9092import org .apache .beam .sdk .io .jms .JmsIO .UnboundedJmsReader ;
9193import org .apache .beam .sdk .metrics .MetricNameFilter ;
@@ -541,6 +543,16 @@ public void testSplitForTopic() throws Exception {
541543 assertEquals (1 , splits .size ());
542544 }
543545
546+ private boolean advanceWithRetry (UnboundedSource .UnboundedReader reader ) throws IOException {
547+ for (int attempt = 0 ; attempt < 10 ; attempt ++) {
548+ if (reader .advance ()) {
549+ return true ;
550+ }
551+ sleepUninterruptibly (java .time .Duration .ofMillis (100 ));
552+ }
553+ return false ;
554+ }
555+
544556 @ Test
545557 public void testCheckpointMark () throws Exception {
546558 // we are using no prefetch here
@@ -558,7 +570,7 @@ public void testCheckpointMark() throws Exception {
558570
559571 // consume 3 messages (NB: start already consumed the first message)
560572 for (int i = 0 ; i < 3 ; i ++) {
561- assertTrue (String .format ("Failed at %d-th message" , i ), reader . advance ( ));
573+ assertTrue (String .format ("Failed at %d-th message" , i ), advanceWithRetry ( reader ));
562574 }
563575
564576 // the messages are still pending in the queue (no ACK yet)
@@ -572,7 +584,7 @@ public void testCheckpointMark() throws Exception {
572584
573585 // we read the 6 pending messages
574586 for (int i = 0 ; i < 6 ; i ++) {
575- assertTrue (String .format ("Failed at %d-th message" , i ), reader . advance ( ));
587+ assertTrue (String .format ("Failed at %d-th message" , i ), advanceWithRetry ( reader ));
576588 }
577589
578590 // still 6 pending messages as we didn't finalize the checkpoint
@@ -592,8 +604,8 @@ public void testCheckpointMarkAndFinalizeSeparately() throws Exception {
592604 assertTrue (reader .start ());
593605
594606 // consume 2 message (NB: start already consumed the first message)
595- assertTrue (reader . advance ( ));
596- assertTrue (reader . advance ( ));
607+ assertTrue (advanceWithRetry ( reader ));
608+ assertTrue (advanceWithRetry ( reader ));
597609
598610 // get checkpoint mark after consumed 4 messages
599611 CheckpointMark mark = reader .getCheckpointMark ();
@@ -724,7 +736,7 @@ public void testCheckpointMarkSafety() throws Exception {
724736
725737 // consume half the messages (NB: start already consumed the first message)
726738 for (int i = 0 ; i < (messagesToProcess / 2 ) - 1 ; i ++) {
727- assertTrue (reader . advance ( ));
739+ assertTrue (advanceWithRetry ( reader ));
728740 }
729741
730742 // the messages are still pending in the queue (no ACK yet)
@@ -738,7 +750,7 @@ public void testCheckpointMarkSafety() throws Exception {
738750 () -> {
739751 try {
740752 for (int i = 0 ; i < messagesToProcess / 2 ; i ++) {
741- assertTrue (reader . advance ( ));
753+ assertTrue (advanceWithRetry ( reader ));
742754 }
743755 } catch (IOException ex ) {
744756 throw new RuntimeException (ex );
@@ -877,7 +889,7 @@ public void testDiscardCheckpointMark() throws Exception {
877889
878890 // consume 3 more messages (NB: start already consumed the first message)
879891 for (int i = 0 ; i < 3 ; i ++) {
880- assertTrue (reader . advance ( ));
892+ assertTrue (advanceWithRetry ( reader ));
881893 }
882894
883895 // the messages are still pending in the queue (no ACK yet)
@@ -891,7 +903,7 @@ public void testDiscardCheckpointMark() throws Exception {
891903
892904 // we read the 6 pending messages
893905 for (int i = 0 ; i < 6 ; i ++) {
894- assertTrue (reader . advance ( ));
906+ assertTrue (advanceWithRetry ( reader ));
895907 }
896908
897909 // still 6 pending messages as we didn't finalize the checkpoint
0 commit comments