diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java index 6fe655208738..7f3b394d7f6a 100644 --- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java @@ -25,6 +25,7 @@ import static org.apache.beam.sdk.io.jms.CommonJms.toSerializableFunction; import static org.apache.beam.sdk.io.jms.JmsIO.Writer.JMS_IO_PRODUCER_METRIC_NAME; import static org.apache.beam.sdk.io.jms.JmsIO.Writer.PUBLICATION_RETRIES_METRIC_NAME; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; import static org.hamcrest.CoreMatchers.allOf; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; @@ -86,6 +87,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; import org.apache.beam.sdk.io.jms.JmsIO.UnboundedJmsReader; import org.apache.beam.sdk.metrics.MetricNameFilter; @@ -541,6 +543,16 @@ public void testSplitForTopic() throws Exception { assertEquals(1, splits.size()); } + private boolean advanceWithRetry(UnboundedSource.UnboundedReader reader) throws IOException { + for (int attempt = 0; attempt < 10; attempt++) { + if (reader.advance()) { + return true; + } + sleepUninterruptibly(java.time.Duration.ofMillis(100)); + } + return false; + } + @Test public void testCheckpointMark() throws Exception { // we are using no prefetch here @@ -558,7 +570,7 @@ public void testCheckpointMark() throws Exception { // consume 3 messages (NB: start already consumed the first message) for (int i = 0; i < 3; i++) { - assertTrue(String.format("Failed at %d-th message", i), reader.advance()); + assertTrue(String.format("Failed at %d-th message", i), advanceWithRetry(reader)); } // the messages are still pending in the queue (no ACK yet) @@ -572,7 +584,7 @@ public void testCheckpointMark() throws Exception { // we read the 6 pending messages for (int i = 0; i < 6; i++) { - assertTrue(String.format("Failed at %d-th message", i), reader.advance()); + assertTrue(String.format("Failed at %d-th message", i), advanceWithRetry(reader)); } // still 6 pending messages as we didn't finalize the checkpoint @@ -592,8 +604,8 @@ public void testCheckpointMarkAndFinalizeSeparately() throws Exception { assertTrue(reader.start()); // consume 2 message (NB: start already consumed the first message) - assertTrue(reader.advance()); - assertTrue(reader.advance()); + assertTrue(advanceWithRetry(reader)); + assertTrue(advanceWithRetry(reader)); // get checkpoint mark after consumed 4 messages CheckpointMark mark = reader.getCheckpointMark(); @@ -724,7 +736,7 @@ public void testCheckpointMarkSafety() throws Exception { // consume half the messages (NB: start already consumed the first message) for (int i = 0; i < (messagesToProcess / 2) - 1; i++) { - assertTrue(reader.advance()); + assertTrue(advanceWithRetry(reader)); } // the messages are still pending in the queue (no ACK yet) @@ -738,7 +750,7 @@ public void testCheckpointMarkSafety() throws Exception { () -> { try { for (int i = 0; i < messagesToProcess / 2; i++) { - assertTrue(reader.advance()); + assertTrue(advanceWithRetry(reader)); } } catch (IOException ex) { throw new RuntimeException(ex); @@ -877,7 +889,7 @@ public void testDiscardCheckpointMark() throws Exception { // consume 3 more messages (NB: start already consumed the first message) for (int i = 0; i < 3; i++) { - assertTrue(reader.advance()); + assertTrue(advanceWithRetry(reader)); } // the messages are still pending in the queue (no ACK yet) @@ -891,7 +903,7 @@ public void testDiscardCheckpointMark() throws Exception { // we read the 6 pending messages for (int i = 0; i < 6; i++) { - assertTrue(reader.advance()); + assertTrue(advanceWithRetry(reader)); } // still 6 pending messages as we didn't finalize the checkpoint