Skip to content

Commit 8472d03

Browse files
committed
[FLINK-36441] Fix leak for non-transactional FlinkKafkaProducer.
For non-transactional producers, a notifyCheckpointCompleted after finishOperator will set the transaction inside the 2PCSinkFunction to null, such that on close, the producer is leaked. Since transactional producer stores the transactions in pendingTransactions before that, we just need to fix the cases where we don't preCommit/commit. The easiest solution is to actually close the producer on finishOperator - no new record can arrive.
1 parent ab333a4 commit 8472d03

File tree

2 files changed

+116
-153
lines changed

2 files changed

+116
-153
lines changed

0 commit comments

Comments
 (0)