Skip to content

Commit 0d81bc8

Browse files
author
kevin_tseng
committed
[FLINK-33545][Connectors/Kafka] KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching
1 parent 15f2662 commit 0d81bc8

File tree

2 files changed

+47
-1
lines changed

2 files changed

+47
-1
lines changed

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.time.Duration;
3838
import java.util.Properties;
3939
import java.util.concurrent.Future;
40+
import java.util.concurrent.atomic.AtomicLong;
4041

4142
import static org.apache.flink.util.Preconditions.checkState;
4243

@@ -55,6 +56,7 @@ class FlinkKafkaInternalProducer<K, V> extends KafkaProducer<K, V> {
5556
private volatile boolean inTransaction;
5657
private volatile boolean hasRecordsInTransaction;
5758
private volatile boolean closed;
59+
private final AtomicLong pendingRecords = new AtomicLong(0);
5860

5961
public FlinkKafkaInternalProducer(Properties properties, @Nullable String transactionalId) {
6062
super(withTransactionalId(properties, transactionalId));
@@ -72,12 +74,17 @@ private static Properties withTransactionalId(
7274
return props;
7375
}
7476

77+
public long getPendingRecordsCount() {
78+
return pendingRecords.get();
79+
}
80+
7581
@Override
7682
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
7783
if (inTransaction) {
7884
hasRecordsInTransaction = true;
7985
}
80-
return super.send(record, callback);
86+
pendingRecords.incrementAndGet();
87+
return super.send(record, new TrackingCallback(callback));
8188
}
8289

8390
@Override
@@ -86,6 +93,11 @@ public void flush() {
8693
if (inTransaction) {
8794
flushNewPartitions();
8895
}
96+
final long pendingRecordsCount = pendingRecords.get();
97+
if (pendingRecordsCount != 0) {
98+
throw new IllegalStateException(
99+
"Pending record count must be zero at this point: " + pendingRecordsCount);
100+
}
89101
}
90102

91103
@Override
@@ -396,8 +408,27 @@ public String toString() {
396408
+ transactionalId
397409
+ "', inTransaction="
398410
+ inTransaction
411+
+ ", pendingRecords="
412+
+ pendingRecords.get()
399413
+ ", closed="
400414
+ closed
401415
+ '}';
402416
}
417+
418+
public class TrackingCallback implements Callback {
419+
420+
private final Callback actualCallback;
421+
422+
public TrackingCallback(final Callback actualCallback) {
423+
this.actualCallback = actualCallback;
424+
}
425+
426+
@Override
427+
public void onCompletion(final RecordMetadata recordMetadata, final Exception e) {
428+
pendingRecords.decrementAndGet();
429+
if (actualCallback != null) {
430+
actualCallback.onCompletion(recordMetadata, e);
431+
}
432+
}
433+
}
403434
}

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,21 @@ void testAbortOnClose() throws Exception {
487487
}
488488
}
489489

490+
@Test
491+
public void testAtLeastOnceFlushGate() throws Exception {
492+
Properties properties = getKafkaClientConfiguration();
493+
try (final KafkaWriter<Integer> writer =
494+
createWriterWithConfiguration(properties, DeliveryGuarantee.AT_LEAST_ONCE)) {
495+
writer.write(1, SINK_WRITER_CONTEXT);
496+
assertThat(writer.getCurrentProducer().getPendingRecordsCount())
497+
.as("should have one pending record")
498+
.isEqualTo(1);
499+
assertThatCode(() -> writer.flush(false))
500+
.as("should not throw exception")
501+
.doesNotThrowAnyException();
502+
}
503+
}
504+
490505
private void assertKafkaMetricNotPresent(
491506
DeliveryGuarantee guarantee, String configKey, String configValue) throws Exception {
492507
final Properties config = getKafkaClientConfiguration();

0 commit comments

Comments
 (0)