Skip to content

Commit 201026c

Browse files
committed
SolaceIO data loss
- remove message ack from close and advance as it may lead to data loss during work rebalancing or retry. - add async NACK with configurable deadline so any not finalized messages are rejected and retried.
1 parent f4cbb28 commit 201026c

File tree

7 files changed

+79
-29
lines changed

7 files changed

+79
-29
lines changed

sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,7 @@ public class SolaceIO {
412412
}
413413
};
414414
private static final boolean DEFAULT_DEDUPLICATE_RECORDS = false;
415+
private static final int DEFAULT_ACK_DEADLINE_SECONDS = 60;
415416
private static final Duration DEFAULT_WATERMARK_IDLE_DURATION_THRESHOLD =
416417
Duration.standardSeconds(30);
417418
public static final int DEFAULT_WRITER_NUM_SHARDS = 20;
@@ -461,6 +462,7 @@ public static Read<Solace.Record> read() {
461462
.setParseFn(SolaceRecordMapper::map)
462463
.setTimestampFn(SENDER_TIMESTAMP_FUNCTION)
463464
.setDeduplicateRecords(DEFAULT_DEDUPLICATE_RECORDS)
465+
.setAckDeadlineSeconds(DEFAULT_ACK_DEADLINE_SECONDS)
464466
.setWatermarkIdleDurationThreshold(DEFAULT_WATERMARK_IDLE_DURATION_THRESHOLD));
465467
}
466468

@@ -490,6 +492,7 @@ public static <T> Read<T> read(
490492
.setParseFn(parseFn)
491493
.setTimestampFn(timestampFn)
492494
.setDeduplicateRecords(DEFAULT_DEDUPLICATE_RECORDS)
495+
.setAckDeadlineSeconds(DEFAULT_ACK_DEADLINE_SECONDS)
493496
.setWatermarkIdleDurationThreshold(DEFAULT_WATERMARK_IDLE_DURATION_THRESHOLD));
494497
}
495498

@@ -587,6 +590,16 @@ public Read<T> withDeduplicateRecords(boolean deduplicateRecords) {
587590
return this;
588591
}
589592

593+
/**
594+
* Optional, default: 60. Set to ack deadline after which {@link
595+
* org.apache.beam.sdk.io.solace.read.UnboundedSolaceReader} will start to reject outstanding
596+
* messages that were not successfully checkpointed.
597+
*/
598+
public Read<T> withAckDeadlineSeconds(int ackDeadlineSeconds) {
599+
configurationBuilder.setAckDeadlineSeconds(ackDeadlineSeconds);
600+
return this;
601+
}
602+
590603
/**
591604
* Set a factory that creates a {@link org.apache.beam.sdk.io.solace.broker.SempClientFactory}.
592605
*
@@ -689,6 +702,8 @@ abstract static class Configuration<T> {
689702

690703
abstract Duration getWatermarkIdleDurationThreshold();
691704

705+
abstract int getAckDeadlineSeconds();
706+
692707
public static <T> Builder<T> builder() {
693708
Builder<T> builder =
694709
new org.apache.beam.sdk.io.solace.AutoValue_SolaceIO_Read_Configuration.Builder<T>();
@@ -719,6 +734,8 @@ abstract Builder<T> setParseFn(
719734

720735
abstract Builder<T> setWatermarkIdleDurationThreshold(Duration idleDurationThreshold);
721736

737+
abstract Builder<T> setAckDeadlineSeconds(int seconds);
738+
722739
abstract Configuration<T> build();
723740
}
724741
}
@@ -756,7 +773,8 @@ public PCollection<T> expand(PBegin input) {
756773
coder,
757774
configuration.getTimestampFn(),
758775
configuration.getWatermarkIdleDurationThreshold(),
759-
configuration.getParseFn())));
776+
configuration.getParseFn(),
777+
configuration.getAckDeadlineSeconds())));
760778
}
761779

762780
@VisibleForTesting

sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/JcsmpSessionService.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.solacesystems.jcsmp.JCSMPProperties;
3030
import com.solacesystems.jcsmp.JCSMPSession;
3131
import com.solacesystems.jcsmp.Queue;
32+
import com.solacesystems.jcsmp.XMLMessage;
3233
import com.solacesystems.jcsmp.XMLMessageProducer;
3334
import java.io.IOException;
3435
import java.util.Objects;
@@ -143,6 +144,8 @@ private MessageReceiver createFlowReceiver() throws JCSMPException, IOException
143144

144145
ConsumerFlowProperties flowProperties = new ConsumerFlowProperties();
145146
flowProperties.setEndpoint(queue);
147+
flowProperties.addRequiredSettlementOutcomes(
148+
XMLMessage.Outcome.FAILED, XMLMessage.Outcome.REJECTED);
146149
flowProperties.setAckMode(JCSMPProperties.SUPPORTED_MESSAGE_ACK_CLIENT);
147150

148151
EndpointProperties endpointProperties = new EndpointProperties();

sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
2121

2222
import com.solacesystems.jcsmp.BytesXMLMessage;
23+
import com.solacesystems.jcsmp.JCSMPException;
24+
import com.solacesystems.jcsmp.XMLMessage;
2325
import java.io.IOException;
2426
import java.nio.charset.StandardCharsets;
2527
import java.time.Duration;
@@ -31,7 +33,9 @@
3133
import java.util.concurrent.ExecutionException;
3234
import java.util.concurrent.Executors;
3335
import java.util.concurrent.ScheduledExecutorService;
36+
import java.util.concurrent.ScheduledFuture;
3437
import java.util.concurrent.TimeUnit;
38+
import java.util.concurrent.TimeoutException;
3539
import org.apache.beam.sdk.io.UnboundedSource;
3640
import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
3741
import org.apache.beam.sdk.io.solace.broker.SempClient;
@@ -59,12 +63,8 @@ class UnboundedSolaceReader<T> extends UnboundedReader<T> {
5963
private final SessionServiceFactory sessionServiceFactory;
6064
private @Nullable BytesXMLMessage solaceOriginalRecord;
6165
private @Nullable T solaceMappedRecord;
62-
63-
/**
64-
* Queue to place advanced messages before {@link #getCheckpointMark()} is called. CAUTION:
65-
* Accessed by both reader and checkpointing threads.
66-
*/
67-
private final Queue<BytesXMLMessage> safeToAckMessages = new ConcurrentLinkedQueue<>();
66+
private @Nullable ScheduledFuture<?> nackCallback = null;
67+
private final int ackDeadlineSeconds;
6868

6969
/**
7070
* Queue for messages that were ingested in the {@link #advance()} method, but not sent yet to a
@@ -109,6 +109,7 @@ public UnboundedSolaceReader(UnboundedSolaceSource<T> currentSource) {
109109
this.sessionServiceFactory = currentSource.getSessionServiceFactory();
110110
this.sempClient = currentSource.getSempClientFactory().create();
111111
this.readerUuid = UUID.randomUUID();
112+
this.ackDeadlineSeconds = currentSource.getAckDeadlineSeconds();
112113
}
113114

114115
private SessionService getSessionService() {
@@ -136,8 +137,6 @@ public boolean start() {
136137

137138
@Override
138139
public boolean advance() {
139-
finalizeReadyMessages();
140-
141140
BytesXMLMessage receivedXmlMessage;
142141
try {
143142
receivedXmlMessage = getSessionService().getReceiver().receive();
@@ -158,23 +157,28 @@ public boolean advance() {
158157

159158
@Override
160159
public void close() {
161-
finalizeReadyMessages();
160+
try {
161+
if (nackCallback != null) {
162+
// wait only for last one to finish, it will mean all the previous one are also done.
163+
nackCallback.get(ackDeadlineSeconds * 2, TimeUnit.SECONDS);
164+
}
165+
} catch (InterruptedException | ExecutionException | TimeoutException e) {
166+
LOG.error("SolaceIO.Read: Failed to wait till nack background thread is finished");
167+
}
162168
sessionServiceCache.invalidate(readerUuid);
163169
}
164170

165-
public void finalizeReadyMessages() {
171+
public void nackMessages(Queue<BytesXMLMessage> checkpoint) {
166172
BytesXMLMessage msg;
167-
while ((msg = safeToAckMessages.poll()) != null) {
173+
while ((msg = checkpoint.poll()) != null) {
168174
try {
169-
msg.ackMessage();
170-
} catch (IllegalStateException e) {
175+
msg.settle(XMLMessage.Outcome.FAILED);
176+
} catch (IllegalStateException | JCSMPException e) {
171177
LOG.error(
172-
"SolaceIO.Read: failed to acknowledge the message with applicationMessageId={}, ackMessageId={}. Returning the message to queue to retry.",
178+
"SolaceIO.Read: failed to nack the message with applicationMessageId={}, ackMessageId={}.",
173179
msg.getApplicationMessageId(),
174180
msg.getAckMessageId(),
175181
e);
176-
safeToAckMessages.add(msg); // In case the error was transient, might succeed later
177-
break; // Commit is only best effort
178182
}
179183
}
180184
}
@@ -190,8 +194,12 @@ public Instant getWatermark() {
190194

191195
@Override
192196
public UnboundedSource.CheckpointMark getCheckpointMark() {
197+
Queue<BytesXMLMessage> safeToAckMessages = new ConcurrentLinkedQueue<>();
193198
safeToAckMessages.addAll(receivedMessages);
194199
receivedMessages.clear();
200+
nackCallback =
201+
cleanUpThread.schedule(
202+
() -> nackMessages(safeToAckMessages), ackDeadlineSeconds, TimeUnit.SECONDS);
195203
return new SolaceCheckpointMark(safeToAckMessages);
196204
}
197205

sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceSource.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public class UnboundedSolaceSource<T> extends UnboundedSource<T, SolaceCheckpoin
4949
private final SerializableFunction<T, Instant> timestampFn;
5050
private final Duration watermarkIdleDurationThreshold;
5151
private final SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> parseFn;
52+
private final int ackDeadlineSeconds;
5253

5354
public Queue getQueue() {
5455
return queue;
@@ -70,6 +71,10 @@ public Duration getWatermarkIdleDurationThreshold() {
7071
return watermarkIdleDurationThreshold;
7172
}
7273

74+
public int getAckDeadlineSeconds() {
75+
return ackDeadlineSeconds;
76+
}
77+
7378
public SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> getParseFn() {
7479
return parseFn;
7580
}
@@ -83,7 +88,8 @@ public UnboundedSolaceSource(
8388
Coder<T> coder,
8489
SerializableFunction<T, Instant> timestampFn,
8590
Duration watermarkIdleDurationThreshold,
86-
SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> parseFn) {
91+
SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> parseFn,
92+
int ackDeadlineSeconds) {
8793
this.queue = queue;
8894
this.sempClientFactory = sempClientFactory;
8995
this.sessionServiceFactory = sessionServiceFactory;
@@ -93,6 +99,7 @@ public UnboundedSolaceSource(
9399
this.timestampFn = timestampFn;
94100
this.watermarkIdleDurationThreshold = watermarkIdleDurationThreshold;
95101
this.parseFn = parseFn;
102+
this.ackDeadlineSeconds = ackDeadlineSeconds;
96103
}
97104

98105
@Override
@@ -134,7 +141,8 @@ private List<UnboundedSolaceSource<T>> getSolaceSources(
134141
coder,
135142
timestampFn,
136143
watermarkIdleDurationThreshold,
137-
parseFn);
144+
parseFn,
145+
ackDeadlineSeconds);
138146
sourceList.add(source);
139147
}
140148
return sourceList;

sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -72,14 +72,16 @@ private Read<Record> getDefaultRead() {
7272
return SolaceIO.read()
7373
.from(Solace.Queue.fromName("queue"))
7474
.withSempClientFactory(MockSempClientFactory.getDefaultMock())
75-
.withMaxNumConnections(1);
75+
.withMaxNumConnections(1)
76+
.withAckDeadlineSeconds(1);
7677
}
7778

7879
private Read<Record> getDefaultReadForTopic() {
7980
return SolaceIO.read()
8081
.from(Solace.Topic.fromName("topic"))
8182
.withSempClientFactory(MockSempClientFactory.getDefaultMock())
82-
.withMaxNumConnections(1);
83+
.withMaxNumConnections(1)
84+
.withAckDeadlineSeconds(1);
8385
}
8486

8587
private static BytesXMLMessage getOrNull(Integer index, List<BytesXMLMessage> messages) {
@@ -97,7 +99,8 @@ private static UnboundedSolaceSource<Record> getSource(Read<Record> spec, TestPi
9799
spec.inferCoder(pipeline, configuration.getTypeDescriptor()),
98100
configuration.getTimestampFn(),
99101
configuration.getWatermarkIdleDurationThreshold(),
100-
configuration.getParseFn());
102+
configuration.getParseFn(),
103+
configuration.getAckDeadlineSeconds());
101104
}
102105

103106
@Test
@@ -437,7 +440,8 @@ public void testCheckpointMarkAndFinalizeSeparately() throws Exception {
437440
Read<Record> spec =
438441
getDefaultRead()
439442
.withSessionServiceFactory(fakeSessionServiceFactory)
440-
.withMaxNumConnections(4);
443+
.withMaxNumConnections(4)
444+
.withAckDeadlineSeconds(1);
441445

442446
UnboundedSolaceSource<Record> initialSource = getSource(spec, pipeline);
443447

@@ -458,19 +462,26 @@ public void testCheckpointMarkAndFinalizeSeparately() throws Exception {
458462
// mark all consumed messages as ready to be acknowledged
459463
CheckpointMark checkpointMark = reader.getCheckpointMark();
460464

461-
// consume 1 more message. This will call #ackMsg() on messages that were ready to be acked.
465+
// consume 1 more message. This will still not call ack.
462466
reader.advance();
463-
assertEquals(4, countAckMessages.get());
467+
assertEquals(0, countAckMessages.get());
464468

465469
// consume 1 more message. No change in the acknowledged messages.
466470
reader.advance();
467-
assertEquals(4, countAckMessages.get());
471+
assertEquals(0, countAckMessages.get());
468472

469473
// acknowledge from the first checkpoint
470474
checkpointMark.finalizeCheckpoint();
471475
// No change in the acknowledged messages, because they were acknowledged in the #advance()
472476
// method.
473477
assertEquals(4, countAckMessages.get());
478+
479+
checkpointMark = reader.getCheckpointMark();
480+
481+
Thread.sleep(2000);
482+
checkpointMark.finalizeCheckpoint();
483+
// messages were nacked, no chane in expected values
484+
assertEquals(4, countAckMessages.get());
474485
}
475486

476487
@Test
@@ -542,7 +553,7 @@ public void testCheckpointMarkSafety() throws Exception {
542553
@Test
543554
public void testDefaultCoder() {
544555
Coder<SolaceCheckpointMark> coder =
545-
new UnboundedSolaceSource<>(null, null, null, 0, false, null, null, null, null)
556+
new UnboundedSolaceSource<>(null, null, null, 0, false, null, null, null, null, 1)
546557
.getCheckpointMarkCoder();
547558
CoderProperties.coderSerializable(coder);
548559
}

sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOCustomSessionServiceFactoryIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ public void test01writeAndReadWithCustomSessionServiceFactory() {
102102
SolaceIO.read()
103103
.from(Queue.fromName(QUEUE_NAME))
104104
.withMaxNumConnections(1)
105+
.withAckDeadlineSeconds(10)
105106
.withDeduplicateRecords(true)
106107
.withSempClientFactory(
107108
BasicAuthSempClientFactory.builder()
@@ -118,7 +119,7 @@ public void test01writeAndReadWithCustomSessionServiceFactory() {
118119
PipelineResult pipelineResult = writerPipeline.run();
119120
// We need enough time for Beam to pull all messages from the queue, but we need a timeout too,
120121
// as the Read connector will keep attempting to read forever.
121-
pipelineResult.waitUntilFinish(Duration.standardSeconds(15));
122+
pipelineResult.waitUntilFinish(Duration.standardMinutes(2));
122123

123124
MetricsReader metricsReader = new MetricsReader(pipelineResult, NAMESPACE);
124125
long actualRecordsCount = metricsReader.getCounterMetric(READ_COUNT);

sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOMultipleSempIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ public void test01writeAndReadWithMultipleSempClientFactory() {
107107
SolaceIO.read()
108108
.from(Queue.fromName(QUEUE_NAME))
109109
.withMaxNumConnections(1)
110+
.withAckDeadlineSeconds(10)
110111
.withDeduplicateRecords(true)
111112
.withSempClientFactory(
112113
BasicAuthMultipleSempClientFactory.builder()
@@ -131,7 +132,7 @@ public void test01writeAndReadWithMultipleSempClientFactory() {
131132
PipelineResult pipelineResult = writerPipeline.run();
132133
// We need enough time for Beam to pull all messages from the queue, but we need a timeout too,
133134
// as the Read connector will keep attempting to read forever.
134-
pipelineResult.waitUntilFinish(Duration.standardSeconds(15));
135+
pipelineResult.waitUntilFinish(Duration.standardMinutes(2));
135136

136137
MetricsReader metricsReader = new MetricsReader(pipelineResult, NAMESPACE);
137138
long actualRecordsCount = metricsReader.getCounterMetric(READ_COUNT);

0 commit comments

Comments
 (0)