Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.apache.beam.sdk.io.jms.CommonJms.toSerializableFunction;
import static org.apache.beam.sdk.io.jms.JmsIO.Writer.JMS_IO_PRODUCER_METRIC_NAME;
import static org.apache.beam.sdk.io.jms.JmsIO.Writer.PUBLICATION_RETRIES_METRIC_NAME;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import static org.hamcrest.CoreMatchers.allOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
Expand Down Expand Up @@ -86,6 +87,7 @@
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
import org.apache.beam.sdk.io.jms.JmsIO.UnboundedJmsReader;
import org.apache.beam.sdk.metrics.MetricNameFilter;
Expand Down Expand Up @@ -541,6 +543,16 @@ public void testSplitForTopic() throws Exception {
assertEquals(1, splits.size());
}

private boolean advanceWithRetry(UnboundedSource.UnboundedReader reader) throws IOException {
for (int attempt = 0; attempt < 10; attempt++) {
if (reader.advance()) {
return true;
}
sleepUninterruptibly(java.time.Duration.ofMillis(100));
}
return false;
}

@Test
public void testCheckpointMark() throws Exception {
// we are using no prefetch here
Expand All @@ -558,7 +570,7 @@ public void testCheckpointMark() throws Exception {

// consume 3 messages (NB: start already consumed the first message)
for (int i = 0; i < 3; i++) {
assertTrue(String.format("Failed at %d-th message", i), reader.advance());
assertTrue(String.format("Failed at %d-th message", i), advanceWithRetry(reader));
}

// the messages are still pending in the queue (no ACK yet)
Expand All @@ -572,7 +584,7 @@ public void testCheckpointMark() throws Exception {

// we read the 6 pending messages
for (int i = 0; i < 6; i++) {
assertTrue(String.format("Failed at %d-th message", i), reader.advance());
assertTrue(String.format("Failed at %d-th message", i), advanceWithRetry(reader));
}

// still 6 pending messages as we didn't finalize the checkpoint
Expand All @@ -592,8 +604,8 @@ public void testCheckpointMarkAndFinalizeSeparately() throws Exception {
assertTrue(reader.start());

// consume 2 message (NB: start already consumed the first message)
assertTrue(reader.advance());
assertTrue(reader.advance());
assertTrue(advanceWithRetry(reader));
assertTrue(advanceWithRetry(reader));

// get checkpoint mark after consumed 4 messages
CheckpointMark mark = reader.getCheckpointMark();
Expand Down Expand Up @@ -724,7 +736,7 @@ public void testCheckpointMarkSafety() throws Exception {

// consume half the messages (NB: start already consumed the first message)
for (int i = 0; i < (messagesToProcess / 2) - 1; i++) {
assertTrue(reader.advance());
assertTrue(advanceWithRetry(reader));
}

// the messages are still pending in the queue (no ACK yet)
Expand All @@ -738,7 +750,7 @@ public void testCheckpointMarkSafety() throws Exception {
() -> {
try {
for (int i = 0; i < messagesToProcess / 2; i++) {
assertTrue(reader.advance());
assertTrue(advanceWithRetry(reader));
}
} catch (IOException ex) {
throw new RuntimeException(ex);
Expand Down Expand Up @@ -877,7 +889,7 @@ public void testDiscardCheckpointMark() throws Exception {

// consume 3 more messages (NB: start already consumed the first message)
for (int i = 0; i < 3; i++) {
assertTrue(reader.advance());
assertTrue(advanceWithRetry(reader));
}

// the messages are still pending in the queue (no ACK yet)
Expand All @@ -891,7 +903,7 @@ public void testDiscardCheckpointMark() throws Exception {

// we read the 6 pending messages
for (int i = 0; i < 6; i++) {
assertTrue(reader.advance());
assertTrue(advanceWithRetry(reader));
}

// still 6 pending messages as we didn't finalize the checkpoint
Expand Down
Loading