Skip to content

Commit 68c9989

Browse files
committed
SolaceIO data loss - remove message ack from close and advance as it may lead to data loss during work rebalancing or retry.
1 parent ece2beb commit 68c9989

File tree

2 files changed

+75
-24
lines changed

2 files changed

+75
-24
lines changed

sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java

Lines changed: 5 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class UnboundedSolaceReader<T> extends UnboundedReader<T> {
6464
* Queue to place advanced messages before {@link #getCheckpointMark()} is called. CAUTION:
6565
* Accessed by both reader and checkpointing threads.
6666
*/
67-
private final Queue<BytesXMLMessage> safeToAckMessages = new ConcurrentLinkedQueue<>();
67+
private Queue<BytesXMLMessage> safeToAckMessages = new ConcurrentLinkedQueue<>();
6868

6969
/**
7070
* Queue for messages that were ingested in the {@link #advance()} method, but not sent yet to a
@@ -136,8 +136,6 @@ public boolean start() {
136136

137137
@Override
138138
public boolean advance() {
139-
finalizeReadyMessages();
140-
141139
BytesXMLMessage receivedXmlMessage;
142140
try {
143141
receivedXmlMessage = getSessionService().getReceiver().receive();
@@ -158,25 +156,10 @@ public boolean advance() {
158156

159157
@Override
160158
public void close() {
161-
finalizeReadyMessages();
162-
sessionServiceCache.invalidate(readerUuid);
163-
}
164-
165-
public void finalizeReadyMessages() {
166-
BytesXMLMessage msg;
167-
while ((msg = safeToAckMessages.poll()) != null) {
168-
try {
169-
msg.ackMessage();
170-
} catch (IllegalStateException e) {
171-
LOG.error(
172-
"SolaceIO.Read: failed to acknowledge the message with applicationMessageId={}, ackMessageId={}. Returning the message to queue to retry.",
173-
msg.getApplicationMessageId(),
174-
msg.getAckMessageId(),
175-
e);
176-
safeToAckMessages.add(msg); // In case the error was transient, might succeed later
177-
break; // Commit is only best effort
178-
}
159+
if (!safeToAckMessages.isEmpty()) {
160+
LOG.error("SolaceIO.Read: reader is closing without ack-ing messages.");
179161
}
162+
sessionServiceCache.invalidate(readerUuid);
180163
}
181164

182165
@Override
@@ -190,6 +173,7 @@ public Instant getWatermark() {
190173

191174
@Override
192175
public UnboundedSource.CheckpointMark getCheckpointMark() {
176+
safeToAckMessages = new ConcurrentLinkedQueue<>();
193177
safeToAckMessages.addAll(receivedMessages);
194178
receivedMessages.clear();
195179
return new SolaceCheckpointMark(safeToAckMessages);

sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java

Lines changed: 70 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -458,13 +458,13 @@ public void testCheckpointMarkAndFinalizeSeparately() throws Exception {
458458
// mark all consumed messages as ready to be acknowledged
459459
CheckpointMark checkpointMark = reader.getCheckpointMark();
460460

461-
// consume 1 more message. This will call #ackMsg() on messages that were ready to be acked.
461+
// consume 1 more message.
462462
reader.advance();
463-
assertEquals(4, countAckMessages.get());
463+
assertEquals(0, countAckMessages.get());
464464

465465
// consume 1 more message. No change in the acknowledged messages.
466466
reader.advance();
467-
assertEquals(4, countAckMessages.get());
467+
assertEquals(0, countAckMessages.get());
468468

469469
// acknowledge from the first checkpoint
470470
checkpointMark.finalizeCheckpoint();
@@ -473,6 +473,73 @@ public void testCheckpointMarkAndFinalizeSeparately() throws Exception {
473473
assertEquals(4, countAckMessages.get());
474474
}
475475

476+
@Test
477+
public void testLateCheckpointOverlappingFlushingOfNextBundle() throws Exception {
478+
AtomicInteger countConsumedMessages = new AtomicInteger(0);
479+
AtomicInteger countAckMessages = new AtomicInteger(0);
480+
481+
// Broker that creates input data
482+
SerializableFunction<Integer, BytesXMLMessage> recordFn =
483+
index -> {
484+
List<BytesXMLMessage> messages = new ArrayList<>();
485+
for (int i = 0; i < 10; i++) {
486+
messages.add(
487+
SolaceDataUtils.getBytesXmlMessage(
488+
"payload_test" + i, "45" + i, (num) -> countAckMessages.incrementAndGet()));
489+
}
490+
countConsumedMessages.incrementAndGet();
491+
return getOrNull(index, messages);
492+
};
493+
494+
SessionServiceFactory fakeSessionServiceFactory =
495+
MockSessionServiceFactory.builder().recordFn(recordFn).minMessagesReceived(10).build();
496+
497+
Read<Record> spec =
498+
getDefaultRead()
499+
.withSessionServiceFactory(fakeSessionServiceFactory)
500+
.withMaxNumConnections(4);
501+
502+
UnboundedSolaceSource<Record> initialSource = getSource(spec, pipeline);
503+
504+
UnboundedReader<Record> reader =
505+
initialSource.createReader(PipelineOptionsFactory.create(), null);
506+
507+
// start the reader and move to the first record
508+
assertTrue(reader.start());
509+
510+
// consume 3 messages (NB: #start() already consumed the first message)
511+
for (int i = 0; i < 3; i++) {
512+
assertTrue(String.format("Failed at %d-th message", i), reader.advance());
513+
}
514+
515+
// #advance() was called, but the messages were not ready to be acknowledged.
516+
assertEquals(0, countAckMessages.get());
517+
518+
// mark all consumed messages as ready to be acknowledged
519+
CheckpointMark checkpointMark = reader.getCheckpointMark();
520+
521+
// data is flushed
522+
523+
// consume 1 more message.
524+
reader.advance();
525+
assertEquals(0, countAckMessages.get());
526+
527+
// consume 1 more message. No change in the acknowledged messages.
528+
reader.advance();
529+
assertEquals(0, countAckMessages.get());
530+
531+
CheckpointMark checkpointMark2 = reader.getCheckpointMark();
532+
// data is prepared for flushing that will be rejected
533+
534+
// acknowledge from the first checkpoint may arrive late
535+
checkpointMark.finalizeCheckpoint();
536+
537+
assertEquals(4, countAckMessages.get());
538+
539+
checkpointMark2.finalizeCheckpoint();
540+
assertEquals(6, countAckMessages.get());
541+
}
542+
476543
@Test
477544
public void testCheckpointMarkSafety() throws Exception {
478545

0 commit comments

Comments
 (0)