File tree Expand file tree Collapse file tree 1 file changed +10
-1
lines changed
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms Expand file tree Collapse file tree 1 file changed +10
-1
lines changed Original file line number Diff line number Diff line change 2525import static org .apache .beam .sdk .io .jms .CommonJms .toSerializableFunction ;
2626import static org .apache .beam .sdk .io .jms .JmsIO .Writer .JMS_IO_PRODUCER_METRIC_NAME ;
2727import static org .apache .beam .sdk .io .jms .JmsIO .Writer .PUBLICATION_RETRIES_METRIC_NAME ;
28+ import static org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .util .concurrent .Uninterruptibles .sleepUninterruptibly ;
2829import static org .hamcrest .CoreMatchers .allOf ;
2930import static org .hamcrest .MatcherAssert .assertThat ;
3031import static org .hamcrest .Matchers .contains ;
@@ -558,7 +559,15 @@ public void testCheckpointMark() throws Exception {
558559
559560 // consume 3 messages (NB: start already consumed the first message)
560561 for (int i = 0 ; i < 3 ; i ++) {
561- assertTrue (String .format ("Failed at %d-th message" , i ), reader .advance ());
562+ boolean readerAdvanced = false ;
563+ for (int attempt = 0 ; attempt < 10 ; attempt ++) {
564+ if (reader .advance ()) {
565+ readerAdvanced = true ;
566+ break ;
567+ }
568+ sleepUninterruptibly (java .time .Duration .ofMillis (100 ));
569+ }
570+ assertTrue (String .format ("Failed at %d-th message" , i ), readerAdvanced );
562571 }
563572
564573 // the messages are still pending in the queue (no ACK yet)
You can’t perform that action at this time.
0 commit comments