Skip to content

Commit 6f251ec

Browse files
committed
[FLINK-37356] Remove transaction reset
Added in a recent commit that introduces ProducerPool, the method was supposed to cleanup a producer that is still in transactional state for reuse. Since we now don't reuse producers for abnormal states anymore, we can now simply double-commit before reuse avoiding at least some reflection hacks.
1 parent ad3ef74 commit 6f251ec

File tree

2 files changed

+4
-23
lines changed

2 files changed

+4
-23
lines changed

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/FlinkKafkaInternalProducer.java

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -162,26 +162,6 @@ public long getProducerId() {
162162
return (long) getField(producerIdAndEpoch, "producerId");
163163
}
164164

165-
/**
166-
* Sets the transaction manager state to uninitialized.
167-
*
168-
* <p>Can only be called if the producer is in a transaction. Its main purpose is to resolve the
169-
* split brain scenario between writer and committer.
170-
*/
171-
public void resetTransactionState() {
172-
checkState(inTransaction, "Not in transactional state");
173-
this.inTransaction = false;
174-
this.hasRecordsInTransaction = false;
175-
Object transactionManager = getTransactionManager();
176-
synchronized (transactionManager) {
177-
setField(transactionManager, "transactionalId", transactionalId);
178-
setField(
179-
transactionManager,
180-
"currentState",
181-
getTransactionManagerState("UNINITIALIZED"));
182-
}
183-
}
184-
185165
/**
186166
* Sets the transactional id and sets the transaction manager state to uninitialized.
187167
*

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImpl.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -167,10 +167,11 @@ private void recycleProducer(@Nullable FlinkKafkaInternalProducer<byte[], byte[]
167167
// For non-chained committer, we have a split brain scenario:
168168
// Both the writer and the committer have a producer representing the same transaction.
169169
// The committer producer has finished the transaction while the writer producer is still in
170-
// transaction. In this case, we forcibly complete the transaction, such that we can
171-
// initialize it.
170+
// transaction.
172171
if (producer.isInTransaction()) {
173-
producer.resetTransactionState();
172+
// Here we just double-commit the same transaction which succeeds in all cases
173+
// because the producer shares the same epoch as the committer's producer
174+
producer.commitTransaction();
174175
}
175176
producerPool.add(producer);
176177

0 commit comments

Comments
 (0)