diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java index a913fd6133ea..eb2d4b3006a6 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java @@ -18,8 +18,8 @@ package org.apache.beam.sdk.io.solace.read; import com.solacesystems.jcsmp.BytesXMLMessage; +import java.util.List; import java.util.Objects; -import java.util.Queue; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; @@ -38,7 +38,7 @@ @VisibleForTesting public class SolaceCheckpointMark implements UnboundedSource.CheckpointMark { private static final Logger LOG = LoggerFactory.getLogger(SolaceCheckpointMark.class); - private transient Queue safeToAck; + private transient List safeToAck; @SuppressWarnings("initialization") // Avro will set the fields by breaking abstraction private SolaceCheckpointMark() {} @@ -48,14 +48,13 @@ private SolaceCheckpointMark() {} * * @param safeToAck - a queue of {@link BytesXMLMessage} to be acknowledged. */ - SolaceCheckpointMark(Queue safeToAck) { + SolaceCheckpointMark(List safeToAck) { this.safeToAck = safeToAck; } @Override public void finalizeCheckpoint() { - BytesXMLMessage msg; - while ((msg = safeToAck.poll()) != null) { + for (BytesXMLMessage msg : safeToAck) { try { msg.ackMessage(); } catch (IllegalStateException e) { diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java index dc84e0a07017..7c756169ef3e 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java @@ -27,7 +27,6 @@ import java.util.NoSuchElementException; import java.util.Queue; import java.util.UUID; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -42,6 +41,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; import org.slf4j.Logger; @@ -60,12 +60,6 @@ class UnboundedSolaceReader extends UnboundedReader { private @Nullable BytesXMLMessage solaceOriginalRecord; private @Nullable T solaceMappedRecord; - /** - * Queue to place advanced messages before {@link #getCheckpointMark()} is called. CAUTION: - * Accessed by both reader and checkpointing threads. - */ - private final Queue safeToAckMessages = new ConcurrentLinkedQueue<>(); - /** * Queue for messages that were ingested in the {@link #advance()} method, but not sent yet to a * {@link SolaceCheckpointMark}. @@ -136,8 +130,6 @@ public boolean start() { @Override public boolean advance() { - finalizeReadyMessages(); - BytesXMLMessage receivedXmlMessage; try { receivedXmlMessage = getSessionService().getReceiver().receive(); @@ -158,27 +150,9 @@ public boolean advance() { @Override public void close() { - finalizeReadyMessages(); sessionServiceCache.invalidate(readerUuid); } - public void finalizeReadyMessages() { - BytesXMLMessage msg; - while ((msg = safeToAckMessages.poll()) != null) { - try { - msg.ackMessage(); - } catch (IllegalStateException e) { - LOG.error( - "SolaceIO.Read: failed to acknowledge the message with applicationMessageId={}, ackMessageId={}. Returning the message to queue to retry.", - msg.getApplicationMessageId(), - msg.getAckMessageId(), - e); - safeToAckMessages.add(msg); // In case the error was transient, might succeed later - break; // Commit is only best effort - } - } - } - @Override public Instant getWatermark() { // should be only used by a test receiver @@ -190,9 +164,10 @@ public Instant getWatermark() { @Override public UnboundedSource.CheckpointMark getCheckpointMark() { - safeToAckMessages.addAll(receivedMessages); + + ImmutableList bytesXMLMessages = ImmutableList.copyOf(receivedMessages); receivedMessages.clear(); - return new SolaceCheckpointMark(safeToAckMessages); + return new SolaceCheckpointMark(bytesXMLMessages); } @Override diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java index a1f80932eddf..c17ec3e128d2 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java @@ -458,13 +458,13 @@ public void testCheckpointMarkAndFinalizeSeparately() throws Exception { // mark all consumed messages as ready to be acknowledged CheckpointMark checkpointMark = reader.getCheckpointMark(); - // consume 1 more message. This will call #ackMsg() on messages that were ready to be acked. + // consume 1 more message. reader.advance(); - assertEquals(4, countAckMessages.get()); + assertEquals(0, countAckMessages.get()); // consume 1 more message. No change in the acknowledged messages. reader.advance(); - assertEquals(4, countAckMessages.get()); + assertEquals(0, countAckMessages.get()); // acknowledge from the first checkpoint checkpointMark.finalizeCheckpoint(); @@ -473,6 +473,73 @@ public void testCheckpointMarkAndFinalizeSeparately() throws Exception { assertEquals(4, countAckMessages.get()); } + @Test + public void testLateCheckpointOverlappingFlushingOfNextBundle() throws Exception { + AtomicInteger countConsumedMessages = new AtomicInteger(0); + AtomicInteger countAckMessages = new AtomicInteger(0); + + // Broker that creates input data + SerializableFunction recordFn = + index -> { + List messages = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + messages.add( + SolaceDataUtils.getBytesXmlMessage( + "payload_test" + i, "45" + i, (num) -> countAckMessages.incrementAndGet())); + } + countConsumedMessages.incrementAndGet(); + return getOrNull(index, messages); + }; + + SessionServiceFactory fakeSessionServiceFactory = + MockSessionServiceFactory.builder().recordFn(recordFn).minMessagesReceived(10).build(); + + Read spec = + getDefaultRead() + .withSessionServiceFactory(fakeSessionServiceFactory) + .withMaxNumConnections(4); + + UnboundedSolaceSource initialSource = getSource(spec, pipeline); + + UnboundedReader reader = + initialSource.createReader(PipelineOptionsFactory.create(), null); + + // start the reader and move to the first record + assertTrue(reader.start()); + + // consume 3 messages (NB: #start() already consumed the first message) + for (int i = 0; i < 3; i++) { + assertTrue(String.format("Failed at %d-th message", i), reader.advance()); + } + + // #advance() was called, but the messages were not ready to be acknowledged. + assertEquals(0, countAckMessages.get()); + + // mark all consumed messages as ready to be acknowledged + CheckpointMark checkpointMark = reader.getCheckpointMark(); + + // data is flushed + + // consume 1 more message. + reader.advance(); + assertEquals(0, countAckMessages.get()); + + // consume 1 more message. No change in the acknowledged messages. + reader.advance(); + assertEquals(0, countAckMessages.get()); + + CheckpointMark checkpointMark2 = reader.getCheckpointMark(); + // data is prepared for flushing that will be rejected + + // acknowledge from the first checkpoint may arrive late + checkpointMark.finalizeCheckpoint(); + + assertEquals(4, countAckMessages.get()); + + checkpointMark2.finalizeCheckpoint(); + assertEquals(6, countAckMessages.get()); + } + @Test public void testCheckpointMarkSafety() throws Exception {