Skip to content

Commit 79f1dd6

Browse files
author
kevin_tseng
committed
[FLINK-33545][Connectors/Kafka] KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching
1 parent 979791c commit 79f1dd6

File tree

5 files changed

+72
-8
lines changed

5 files changed

+72
-8
lines changed

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ class FlinkKafkaInternalProducer<K, V> extends KafkaProducer<K, V> {
5454
@Nullable private String transactionalId;
5555
private volatile boolean inTransaction;
5656
private volatile boolean hasRecordsInTransaction;
57+
private volatile boolean hasRecordsInBuffer;
5758
private volatile boolean closed;
5859

5960
public FlinkKafkaInternalProducer(Properties properties, @Nullable String transactionalId) {
@@ -77,6 +78,7 @@ public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callbac
7778
if (inTransaction) {
7879
hasRecordsInTransaction = true;
7980
}
81+
hasRecordsInBuffer = true;
8082
return super.send(record, callback);
8183
}
8284

@@ -86,6 +88,7 @@ public void flush() {
8688
if (inTransaction) {
8789
flushNewPartitions();
8890
}
91+
hasRecordsInBuffer = false;
8992
}
9093

9194
@Override
@@ -120,6 +123,10 @@ public boolean hasRecordsInTransaction() {
120123
return hasRecordsInTransaction;
121124
}
122125

126+
public boolean hasRecordsInBuffer() {
127+
return hasRecordsInBuffer;
128+
}
129+
123130
@Override
124131
public void close() {
125132
closed = true;
@@ -133,12 +140,14 @@ public void close() {
133140
// If this is outside of a transaction, we should be able to cleanly shutdown.
134141
super.close(Duration.ofHours(1));
135142
}
143+
hasRecordsInBuffer = false;
136144
}
137145

138146
@Override
139147
public void close(Duration timeout) {
140148
closed = true;
141149
super.close(timeout);
150+
hasRecordsInBuffer = false;
142151
}
143152

144153
public boolean isClosed() {

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommittable.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,26 +32,45 @@ class KafkaCommittable {
3232
private final long producerId;
3333
private final short epoch;
3434
private final String transactionalId;
35+
private final boolean transactional;
3536
@Nullable private Recyclable<? extends FlinkKafkaInternalProducer<?, ?>> producer;
3637

3738
public KafkaCommittable(
3839
long producerId,
3940
short epoch,
4041
String transactionalId,
42+
boolean transactional,
4143
@Nullable Recyclable<? extends FlinkKafkaInternalProducer<?, ?>> producer) {
4244
this.producerId = producerId;
4345
this.epoch = epoch;
4446
this.transactionalId = transactionalId;
47+
this.transactional = transactional;
4548
this.producer = producer;
4649
}
4750

51+
public KafkaCommittable(
52+
long producerId,
53+
short epoch,
54+
String transactionalId,
55+
@Nullable Recyclable<? extends FlinkKafkaInternalProducer<?, ?>> producer) {
56+
this(producerId, epoch, transactionalId, true, producer);
57+
}
58+
4859
public static <K, V> KafkaCommittable of(
4960
FlinkKafkaInternalProducer<K, V> producer,
5061
Consumer<FlinkKafkaInternalProducer<K, V>> recycler) {
62+
return KafkaCommittable.of(producer, true, recycler);
63+
}
64+
65+
public static <K, V> KafkaCommittable of(
66+
FlinkKafkaInternalProducer<K, V> producer,
67+
boolean transactional,
68+
Consumer<FlinkKafkaInternalProducer<K, V>> recycler) {
5169
return new KafkaCommittable(
5270
producer.getProducerId(),
5371
producer.getEpoch(),
5472
producer.getTransactionalId(),
73+
transactional,
5574
new Recyclable<>(producer, recycler));
5675
}
5776

@@ -67,6 +86,10 @@ public String getTransactionalId() {
6786
return transactionalId;
6887
}
6988

89+
public boolean isTransactional() {
90+
return transactional;
91+
}
92+
7093
public Optional<Recyclable<? extends FlinkKafkaInternalProducer<?, ?>>> getProducer() {
7194
return Optional.ofNullable(producer);
7295
}
@@ -76,6 +99,8 @@ public String toString() {
7699
return "KafkaCommittable{"
77100
+ "producerId="
78101
+ producerId
102+
+ "transactional="
103+
+ transactional
79104
+ ", epoch="
80105
+ epoch
81106
+ ", transactionalId="
@@ -94,11 +119,12 @@ public boolean equals(Object o) {
94119
KafkaCommittable that = (KafkaCommittable) o;
95120
return producerId == that.producerId
96121
&& epoch == that.epoch
122+
&& transactional == that.transactional
97123
&& transactionalId.equals(that.transactionalId);
98124
}
99125

100126
@Override
101127
public int hashCode() {
102-
return Objects.hash(producerId, epoch, transactionalId);
128+
return Objects.hash(producerId, epoch, transactional, transactionalId);
103129
}
104130
}

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,9 @@ public void commit(Collection<CommitRequest<KafkaCommittable>> requests)
7070
recyclable
7171
.<FlinkKafkaInternalProducer<?, ?>>map(Recyclable::getObject)
7272
.orElseGet(() -> getRecoveryProducer(committable));
73-
producer.commitTransaction();
73+
if (committable.isTransactional()) {
74+
producer.commitTransaction();
75+
}
7476
producer.flush();
7577
recyclable.ifPresent(Recyclable::close);
7678
} catch (RetriableException e) {

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -209,22 +209,29 @@ public void flush(boolean endOfInput) throws IOException, InterruptedException {
209209

210210
@Override
211211
public Collection<KafkaCommittable> prepareCommit() {
212-
if (deliveryGuarantee != DeliveryGuarantee.EXACTLY_ONCE) {
212+
if (deliveryGuarantee == DeliveryGuarantee.NONE) {
213213
return Collections.emptyList();
214214
}
215215

216216
// only return a KafkaCommittable if the current transaction has been written some data
217-
if (currentProducer.hasRecordsInTransaction()) {
217+
// or if there's any record in buffer after our first flush
218+
if (currentProducer.hasRecordsInTransaction() || currentProducer.hasRecordsInBuffer()) {
218219
final List<KafkaCommittable> committables =
219220
Collections.singletonList(
220-
KafkaCommittable.of(currentProducer, producerPool::add));
221+
KafkaCommittable.of(
222+
currentProducer,
223+
deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE,
224+
producerPool::add));
221225
LOG.debug("Committing {} committables.", committables);
222226
return committables;
223227
}
224228

225-
// otherwise, we commit the empty transaction as is (no-op) and just recycle the producer
226-
currentProducer.commitTransaction();
227-
producerPool.add(currentProducer);
229+
if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
230+
// otherwise, we commit the empty transaction as is (no-op) and just recycle the
231+
// producer
232+
currentProducer.commitTransaction();
233+
producerPool.add(currentProducer);
234+
}
228235
return Collections.emptyList();
229236
}
230237

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -507,6 +507,26 @@ void testAbortOnClose() throws Exception {
507507
}
508508
}
509509

510+
@Test
511+
public void testAtLeastOnceSecondFlush() throws Exception {
512+
Properties properties = getKafkaClientConfiguration();
513+
try (final KafkaWriter<Integer> writer =
514+
createWriterWithConfiguration(properties, DeliveryGuarantee.AT_LEAST_ONCE)) {
515+
writer.write(1, SINK_WRITER_CONTEXT);
516+
writer.flush(false);
517+
Collection<KafkaCommittable> commitables = writer.prepareCommit();
518+
assertThat(commitables.size()).isEqualTo(0);
519+
520+
writer.write(2, SINK_WRITER_CONTEXT);
521+
commitables = writer.prepareCommit();
522+
assertThat(commitables.size()).isGreaterThan(0);
523+
524+
writer.flush(false);
525+
commitables = writer.prepareCommit();
526+
assertThat(commitables.size()).isEqualTo(0);
527+
}
528+
}
529+
510530
private void assertKafkaMetricNotPresent(
511531
DeliveryGuarantee guarantee, String configKey, String configValue) throws Exception {
512532
final Properties config = getKafkaClientConfiguration();

0 commit comments

Comments
 (0)