Skip to content

Commit 029b011

Browse files
author
kevin_tseng
committed
[FLINK-33545][Connectors/Kafka] KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching
1 parent 15f2662 commit 029b011

File tree

2 files changed

+33
-0
lines changed

2 files changed

+33
-0
lines changed

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.kafka.clients.producer.RecordMetadata;
2525
import org.apache.kafka.clients.producer.internals.TransactionManager;
2626
import org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
27+
import org.apache.kafka.common.KafkaException;
2728
import org.apache.kafka.common.errors.ProducerFencedException;
2829
import org.slf4j.Logger;
2930
import org.slf4j.LoggerFactory;
@@ -55,6 +56,7 @@ class FlinkKafkaInternalProducer<K, V> extends KafkaProducer<K, V> {
5556
private volatile boolean inTransaction;
5657
private volatile boolean hasRecordsInTransaction;
5758
private volatile boolean closed;
59+
private volatile boolean flushGated;
5860

5961
public FlinkKafkaInternalProducer(Properties properties, @Nullable String transactionalId) {
6062
super(withTransactionalId(properties, transactionalId));
@@ -74,6 +76,10 @@ private static Properties withTransactionalId(
7476

7577
@Override
7678
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
79+
if (flushGated) {
80+
throw new KafkaException(
81+
"unable to write more records, current producer has been flush gated");
82+
}
7783
if (inTransaction) {
7884
hasRecordsInTransaction = true;
7985
}
@@ -82,10 +88,12 @@ public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callbac
8288

8389
@Override
8490
public void flush() {
91+
flushGated = true;
8592
super.flush();
8693
if (inTransaction) {
8794
flushNewPartitions();
8895
}
96+
flushGated = false;
8997
}
9098

9199
@Override
@@ -109,7 +117,9 @@ public void commitTransaction() throws ProducerFencedException {
109117
checkState(inTransaction, "Transaction was not started");
110118
inTransaction = false;
111119
hasRecordsInTransaction = false;
120+
flushGated = true;
112121
super.commitTransaction();
122+
flushGated = false;
113123
}
114124

115125
public boolean isInTransaction() {
@@ -120,6 +130,10 @@ public boolean hasRecordsInTransaction() {
120130
return hasRecordsInTransaction;
121131
}
122132

133+
public void setFlushGate(boolean closed) {
134+
flushGated = closed;
135+
}
136+
123137
@Override
124138
public void close() {
125139
closed = true;
@@ -398,6 +412,8 @@ public String toString() {
398412
+ inTransaction
399413
+ ", closed="
400414
+ closed
415+
+ ", flushGated="
416+
+ flushGated
401417
+ '}';
402418
}
403419
}

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.kafka.clients.consumer.ConsumerRecord;
4040
import org.apache.kafka.clients.producer.ProducerRecord;
4141
import org.apache.kafka.clients.producer.RecordMetadata;
42+
import org.apache.kafka.common.KafkaException;
4243
import org.apache.kafka.common.errors.ProducerFencedException;
4344
import org.apache.kafka.common.serialization.ByteArraySerializer;
4445
import org.junit.jupiter.api.AfterAll;
@@ -487,6 +488,22 @@ void testAbortOnClose() throws Exception {
487488
}
488489
}
489490

491+
@Test
492+
public void testAtLeastOnceFlushGate() throws Exception {
493+
Properties properties = getKafkaClientConfiguration();
494+
try (final KafkaWriter<Integer> writer =
495+
createWriterWithConfiguration(properties, DeliveryGuarantee.AT_LEAST_ONCE)) {
496+
writer.getCurrentProducer().setFlushGate(true);
497+
assertThatCode(() -> writer.write(1, SINK_WRITER_CONTEXT))
498+
.as("flush gate should throw IOException on another write attempt")
499+
.isInstanceOf(KafkaException.class);
500+
writer.getCurrentProducer().setFlushGate(false);
501+
assertThatCode(() -> writer.write(2, SINK_WRITER_CONTEXT))
502+
.as("the exception is not thrown again")
503+
.doesNotThrowAnyException();
504+
}
505+
}
506+
490507
private void assertKafkaMetricNotPresent(
491508
DeliveryGuarantee guarantee, String configKey, String configValue) throws Exception {
492509
final Properties config = getKafkaClientConfiguration();

0 commit comments

Comments
 (0)