Skip to content

Commit f4cbb28

Browse files
committed
Revert "SolaceIO data loss - remove message ack from close and advance as it may lead to data loss during work rebalancing or retry. "
This reverts commit f277b5ec59ba213af547167080fd8f5bd210e6a7.
1 parent cc8dc2f commit f4cbb28

File tree

3 files changed

+37
-78
lines changed

3 files changed

+37
-78
lines changed

sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
package org.apache.beam.sdk.io.solace.read;
1919

2020
import com.solacesystems.jcsmp.BytesXMLMessage;
21-
import java.util.List;
2221
import java.util.Objects;
22+
import java.util.Queue;
2323
import org.apache.beam.sdk.annotations.Internal;
2424
import org.apache.beam.sdk.coders.DefaultCoder;
2525
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
@@ -38,7 +38,7 @@
3838
@VisibleForTesting
3939
public class SolaceCheckpointMark implements UnboundedSource.CheckpointMark {
4040
private static final Logger LOG = LoggerFactory.getLogger(SolaceCheckpointMark.class);
41-
private transient List<BytesXMLMessage> safeToAck;
41+
private transient Queue<BytesXMLMessage> safeToAck;
4242

4343
@SuppressWarnings("initialization") // Avro will set the fields by breaking abstraction
4444
private SolaceCheckpointMark() {}
@@ -48,13 +48,14 @@ private SolaceCheckpointMark() {}
4848
*
4949
* @param safeToAck - a queue of {@link BytesXMLMessage} to be acknowledged.
5050
*/
51-
SolaceCheckpointMark(List<BytesXMLMessage> safeToAck) {
51+
SolaceCheckpointMark(Queue<BytesXMLMessage> safeToAck) {
5252
this.safeToAck = safeToAck;
5353
}
5454

5555
@Override
5656
public void finalizeCheckpoint() {
57-
for (BytesXMLMessage msg : safeToAck) {
57+
BytesXMLMessage msg;
58+
while ((msg = safeToAck.poll()) != null) {
5859
try {
5960
msg.ackMessage();
6061
} catch (IllegalStateException e) {

sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.NoSuchElementException;
2828
import java.util.Queue;
2929
import java.util.UUID;
30+
import java.util.concurrent.ConcurrentLinkedQueue;
3031
import java.util.concurrent.ExecutionException;
3132
import java.util.concurrent.Executors;
3233
import java.util.concurrent.ScheduledExecutorService;
@@ -41,7 +42,6 @@
4142
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
4243
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
4344
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification;
44-
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
4545
import org.checkerframework.checker.nullness.qual.Nullable;
4646
import org.joda.time.Instant;
4747
import org.slf4j.Logger;
@@ -60,6 +60,12 @@ class UnboundedSolaceReader<T> extends UnboundedReader<T> {
6060
private @Nullable BytesXMLMessage solaceOriginalRecord;
6161
private @Nullable T solaceMappedRecord;
6262

63+
/**
64+
* Queue to place advanced messages before {@link #getCheckpointMark()} is called. CAUTION:
65+
* Accessed by both reader and checkpointing threads.
66+
*/
67+
private final Queue<BytesXMLMessage> safeToAckMessages = new ConcurrentLinkedQueue<>();
68+
6369
/**
6470
* Queue for messages that were ingested in the {@link #advance()} method, but not sent yet to a
6571
* {@link SolaceCheckpointMark}.
@@ -130,6 +136,8 @@ public boolean start() {
130136

131137
@Override
132138
public boolean advance() {
139+
finalizeReadyMessages();
140+
133141
BytesXMLMessage receivedXmlMessage;
134142
try {
135143
receivedXmlMessage = getSessionService().getReceiver().receive();
@@ -150,9 +158,27 @@ public boolean advance() {
150158

151159
@Override
152160
public void close() {
161+
finalizeReadyMessages();
153162
sessionServiceCache.invalidate(readerUuid);
154163
}
155164

165+
public void finalizeReadyMessages() {
166+
BytesXMLMessage msg;
167+
while ((msg = safeToAckMessages.poll()) != null) {
168+
try {
169+
msg.ackMessage();
170+
} catch (IllegalStateException e) {
171+
LOG.error(
172+
"SolaceIO.Read: failed to acknowledge the message with applicationMessageId={}, ackMessageId={}. Returning the message to queue to retry.",
173+
msg.getApplicationMessageId(),
174+
msg.getAckMessageId(),
175+
e);
176+
safeToAckMessages.add(msg); // In case the error was transient, might succeed later
177+
break; // Commit is only best effort
178+
}
179+
}
180+
}
181+
156182
@Override
157183
public Instant getWatermark() {
158184
// should be only used by a test receiver
@@ -164,10 +190,9 @@ public Instant getWatermark() {
164190

165191
@Override
166192
public UnboundedSource.CheckpointMark getCheckpointMark() {
167-
168-
ImmutableList<BytesXMLMessage> bytesXMLMessages = ImmutableList.copyOf(receivedMessages);
193+
safeToAckMessages.addAll(receivedMessages);
169194
receivedMessages.clear();
170-
return new SolaceCheckpointMark(bytesXMLMessages);
195+
return new SolaceCheckpointMark(safeToAckMessages);
171196
}
172197

173198
@Override

sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java

Lines changed: 3 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -458,13 +458,13 @@ public void testCheckpointMarkAndFinalizeSeparately() throws Exception {
458458
// mark all consumed messages as ready to be acknowledged
459459
CheckpointMark checkpointMark = reader.getCheckpointMark();
460460

461-
// consume 1 more message.
461+
// consume 1 more message. This will call #ackMsg() on messages that were ready to be acked.
462462
reader.advance();
463-
assertEquals(0, countAckMessages.get());
463+
assertEquals(4, countAckMessages.get());
464464

465465
// consume 1 more message. No change in the acknowledged messages.
466466
reader.advance();
467-
assertEquals(0, countAckMessages.get());
467+
assertEquals(4, countAckMessages.get());
468468

469469
// acknowledge from the first checkpoint
470470
checkpointMark.finalizeCheckpoint();
@@ -473,73 +473,6 @@ public void testCheckpointMarkAndFinalizeSeparately() throws Exception {
473473
assertEquals(4, countAckMessages.get());
474474
}
475475

476-
@Test
477-
public void testLateCheckpointOverlappingFlushingOfNextBundle() throws Exception {
478-
AtomicInteger countConsumedMessages = new AtomicInteger(0);
479-
AtomicInteger countAckMessages = new AtomicInteger(0);
480-
481-
// Broker that creates input data
482-
SerializableFunction<Integer, BytesXMLMessage> recordFn =
483-
index -> {
484-
List<BytesXMLMessage> messages = new ArrayList<>();
485-
for (int i = 0; i < 10; i++) {
486-
messages.add(
487-
SolaceDataUtils.getBytesXmlMessage(
488-
"payload_test" + i, "45" + i, (num) -> countAckMessages.incrementAndGet()));
489-
}
490-
countConsumedMessages.incrementAndGet();
491-
return getOrNull(index, messages);
492-
};
493-
494-
SessionServiceFactory fakeSessionServiceFactory =
495-
MockSessionServiceFactory.builder().recordFn(recordFn).minMessagesReceived(10).build();
496-
497-
Read<Record> spec =
498-
getDefaultRead()
499-
.withSessionServiceFactory(fakeSessionServiceFactory)
500-
.withMaxNumConnections(4);
501-
502-
UnboundedSolaceSource<Record> initialSource = getSource(spec, pipeline);
503-
504-
UnboundedReader<Record> reader =
505-
initialSource.createReader(PipelineOptionsFactory.create(), null);
506-
507-
// start the reader and move to the first record
508-
assertTrue(reader.start());
509-
510-
// consume 3 messages (NB: #start() already consumed the first message)
511-
for (int i = 0; i < 3; i++) {
512-
assertTrue(String.format("Failed at %d-th message", i), reader.advance());
513-
}
514-
515-
// #advance() was called, but the messages were not ready to be acknowledged.
516-
assertEquals(0, countAckMessages.get());
517-
518-
// mark all consumed messages as ready to be acknowledged
519-
CheckpointMark checkpointMark = reader.getCheckpointMark();
520-
521-
// data is flushed
522-
523-
// consume 1 more message.
524-
reader.advance();
525-
assertEquals(0, countAckMessages.get());
526-
527-
// consume 1 more message. No change in the acknowledged messages.
528-
reader.advance();
529-
assertEquals(0, countAckMessages.get());
530-
531-
CheckpointMark checkpointMark2 = reader.getCheckpointMark();
532-
// data is prepared for flushing that will be rejected
533-
534-
// acknowledge from the first checkpoint may arrive late
535-
checkpointMark.finalizeCheckpoint();
536-
537-
assertEquals(4, countAckMessages.get());
538-
539-
checkpointMark2.finalizeCheckpoint();
540-
assertEquals(6, countAckMessages.get());
541-
}
542-
543476
@Test
544477
public void testCheckpointMarkSafety() throws Exception {
545478

0 commit comments

Comments
 (0)