Skip to content

Commit 4606010

Browse files
committed
[FLINK-37622] Fix KafkaSink in BATCH
Since BATCH doesn't call snapshotState(), we never rotate the producer in batch mode. Hence, on closing the producer in the write failover region, we also abort the ongoing transaction. The committer in the next region is then failing. This commit generalizes the client side transaction state machine and adds a new state for precommitted. Then the writer must abort only those transactions that are not precommitted.
1 parent 2b199b0 commit 4606010

File tree

3 files changed

+74
-25
lines changed

3 files changed

+74
-25
lines changed

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ public Collection<KafkaCommittable> prepareCommit() {
217217
if (currentProducer.hasRecordsInTransaction()) {
218218
KafkaCommittable committable = KafkaCommittable.of(currentProducer);
219219
LOG.debug("Prepare {}.", committable);
220+
currentProducer.precommitTransaction();
220221
return Collections.singletonList(committable);
221222
}
222223

@@ -269,9 +270,12 @@ public void close() throws Exception {
269270
}
270271

271272
private void abortCurrentProducer() {
272-
// only abort if the transaction is known to the broker (needs to have at least one record
273-
// sent)
274-
if (currentProducer.isInTransaction() && currentProducer.hasRecordsInTransaction()) {
273+
// Abort only if the transaction is known to the broker (at least one record sent).
274+
// Producer may be in precommitted state if we run in batch; aborting would mean data loss.
275+
// Note that this may leave the transaction open if an error happens in streaming between
276+
// #prepareCommit and #snapshotState. However, aborting here is best effort anyways and
277+
// recovery will cleanup the transaction.
278+
if (currentProducer.hasRecordsInTransaction()) {
275279
try {
276280
currentProducer.abortTransaction();
277281
} catch (ProducerFencedException e) {

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/FlinkKafkaInternalProducer.java

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,7 @@ public class FlinkKafkaInternalProducer<K, V> extends KafkaProducer<K, V> {
5555
private static final String PRODUCER_ID_AND_EPOCH_FIELD_NAME = "producerIdAndEpoch";
5656

5757
@Nullable private String transactionalId;
58-
private volatile boolean inTransaction;
59-
private volatile boolean hasRecordsInTransaction;
58+
private volatile TransactionState transactionState = TransactionState.NOT_IN_TRANSACTION;
6059
private volatile boolean closed;
6160

6261
public FlinkKafkaInternalProducer(Properties properties) {
@@ -79,16 +78,16 @@ private static Properties withTransactionalId(Properties properties, String tran
7978

8079
@Override
8180
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
82-
if (inTransaction) {
83-
hasRecordsInTransaction = true;
81+
if (isInTransaction()) {
82+
transactionState = TransactionState.DATA_IN_TRANSACTION;
8483
}
8584
return super.send(record, callback);
8685
}
8786

8887
@Override
8988
public void flush() {
9089
super.flush();
91-
if (inTransaction) {
90+
if (isInTransaction()) {
9291
flushNewPartitions();
9392
}
9493
}
@@ -97,33 +96,40 @@ public void flush() {
9796
public void beginTransaction() throws ProducerFencedException {
9897
super.beginTransaction();
9998
LOG.debug("beginTransaction {}", transactionalId);
100-
inTransaction = true;
99+
transactionState = TransactionState.IN_TRANSACTION;
101100
}
102101

103102
@Override
104103
public void abortTransaction() throws ProducerFencedException {
105104
LOG.debug("abortTransaction {}", transactionalId);
106-
checkState(inTransaction, "Transaction was not started");
107-
inTransaction = false;
108-
hasRecordsInTransaction = false;
105+
checkState(isInTransaction(), "Transaction was not started");
106+
transactionState = TransactionState.NOT_IN_TRANSACTION;
109107
super.abortTransaction();
110108
}
111109

112110
@Override
113111
public void commitTransaction() throws ProducerFencedException {
114112
LOG.debug("commitTransaction {}", transactionalId);
115-
checkState(inTransaction, "Transaction was not started");
116-
inTransaction = false;
117-
hasRecordsInTransaction = false;
113+
checkState(isInTransaction(), "Transaction was not started");
114+
transactionState = TransactionState.NOT_IN_TRANSACTION;
118115
super.commitTransaction();
119116
}
120117

121118
public boolean isInTransaction() {
122-
return inTransaction;
119+
return transactionState != TransactionState.NOT_IN_TRANSACTION;
123120
}
124121

125122
public boolean hasRecordsInTransaction() {
126-
return hasRecordsInTransaction;
123+
return transactionState == TransactionState.DATA_IN_TRANSACTION;
124+
}
125+
126+
public boolean isPrecommitted() {
127+
return transactionState == TransactionState.PRECOMMITTED;
128+
}
129+
130+
public void precommitTransaction() {
131+
checkState(hasRecordsInTransaction(), "Transaction was not started");
132+
transactionState = TransactionState.PRECOMMITTED;
127133
}
128134

129135
@Override
@@ -172,7 +178,7 @@ public long getProducerId() {
172178
*/
173179
public void setTransactionId(String transactionalId) {
174180
checkState(
175-
!inTransaction,
181+
!isInTransaction(),
176182
String.format("Another transaction %s is still open.", transactionalId));
177183
LOG.debug("Change transaction id from {} to {}", this.transactionalId, transactionalId);
178184
this.transactionalId = transactionalId;
@@ -292,7 +298,7 @@ private static Object getField(Object object, Class<?> clazz, String fieldName)
292298
* https://github.com/apache/kafka/commit/5d2422258cb975a137a42a4e08f03573c49a387e#diff-f4ef1afd8792cd2a2e9069cd7ddea630
293299
*/
294300
public void resumeTransaction(long producerId, short epoch) {
295-
checkState(!inTransaction, "Already in transaction %s", transactionalId);
301+
checkState(!isInTransaction(), "Already in transaction %s", transactionalId);
296302
checkState(
297303
producerId >= 0 && epoch >= 0,
298304
"Incorrect values for producerId %s and epoch %s",
@@ -329,9 +335,8 @@ public void resumeTransaction(long producerId, short epoch) {
329335
// when we create recovery producers to resume transactions and commit
330336
// them, we should always set this flag.
331337
setField(transactionManager, "transactionStarted", true);
332-
this.inTransaction = true;
333-
this.hasRecordsInTransaction = true;
334338
}
339+
this.transactionState = TransactionState.PRECOMMITTED;
335340
}
336341

337342
private static Object createProducerIdAndEpoch(long producerId, short epoch) {
@@ -391,7 +396,14 @@ private static void transitionTransactionManagerStateTo(
391396
@Override
392397
public String toString() {
393398
return String.format(
394-
"FlinkKafkaInternalProducer@%d{transactionalId='%s', inTransaction=%s, closed=%s}",
395-
System.identityHashCode(this), transactionalId, inTransaction, closed);
399+
"FlinkKafkaInternalProducer@%d{transactionalId='%s', transactionState=%s, closed=%s}",
400+
System.identityHashCode(this), transactionalId, transactionState, closed);
401+
}
402+
403+
enum TransactionState {
404+
NOT_IN_TRANSACTION,
405+
IN_TRANSACTION,
406+
DATA_IN_TRANSACTION,
407+
PRECOMMITTED,
396408
}
397409
}

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.flink.connector.kafka.sink;
1919

2020
import org.apache.flink.api.common.JobID;
21+
import org.apache.flink.api.common.RuntimeExecutionMode;
2122
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
2223
import org.apache.flink.api.common.functions.MapFunction;
2324
import org.apache.flink.api.common.serialization.SerializationSchema;
@@ -239,6 +240,31 @@ public void testWriteRecordsToKafkaWithExactlyOnceGuarantee(
239240
writeRecordsToKafka(DeliveryGuarantee.EXACTLY_ONCE, namingStrategy, chained);
240241
}
241242

243+
@Test
244+
public void testWriteRecordsToKafkaWithExactlyOnceGuaranteeBatch() throws Exception {
245+
final StreamExecutionEnvironment env =
246+
StreamExecutionEnvironment.getExecutionEnvironment(createConfiguration(1));
247+
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
248+
int count = 1000;
249+
final DataStream<Long> source = createSource(env, false, count);
250+
source.sinkTo(
251+
new KafkaSinkBuilder<Long>()
252+
.setBootstrapServers(KAFKA_CONTAINER.getBootstrapServers())
253+
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
254+
.setRecordSerializer(
255+
KafkaRecordSerializationSchema.builder()
256+
.setTopic(topic)
257+
.setValueSerializationSchema(new RecordSerializer())
258+
.build())
259+
.setTransactionalIdPrefix("kafka-sink")
260+
.build());
261+
env.execute();
262+
263+
final List<Long> collectedRecords =
264+
deserializeValues(drainAllRecordsFromTopic(topic, true));
265+
assertThat(collectedRecords).hasSize(count);
266+
}
267+
242268
static Stream<Arguments> getEOSParameters() {
243269
return Arrays.stream(TransactionNamingStrategy.values())
244270
.flatMap(
@@ -659,11 +685,18 @@ private void writeRecordsToKafka(
659685
}
660686

661687
private DataStream<Long> createThrottlingSource(StreamExecutionEnvironment env) {
688+
return createSource(env, true, 1000);
689+
}
690+
691+
private DataStream<Long> createSource(
692+
StreamExecutionEnvironment env, boolean throttled, int count) {
662693
return env.fromSource(
663694
new DataGeneratorSource<>(
664695
value -> value,
665-
1000,
666-
new ThrottleUntilFirstCheckpointStrategy(),
696+
count,
697+
throttled
698+
? new ThrottleUntilFirstCheckpointStrategy()
699+
: RateLimiterStrategy.noOp(),
667700
BasicTypeInfo.LONG_TYPE_INFO),
668701
WatermarkStrategy.noWatermarks(),
669702
"Generator Source");

0 commit comments

Comments
 (0)