Skip to content

Commit a84c2b8

Browse files
committed
[FLINK-37613] Fix resource leak during abortion
Transaction abortion can take a while. Apparently, task cleanup may not trigger correctly if this abortion is interrupted (for some unrelated reason). This leaks producers. The fix is to close() manually during initialization of the EOS writer until we fixed the Flink bug.
1 parent d74a7bd commit a84c2b8

File tree

4 files changed

+78
-5
lines changed

4 files changed

+78
-5
lines changed

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,9 +131,19 @@ class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> {
131131

132132
@Override
133133
public void initialize() {
134-
abortLingeringTransactions(
135-
checkNotNull(recoveredStates, "recoveredStates"), restoredCheckpointId + 1);
136-
this.currentProducer = startTransaction(restoredCheckpointId + 1);
134+
// Workaround for FLINK-37612: ensure that we are not leaking producers
135+
try {
136+
abortLingeringTransactions(
137+
checkNotNull(recoveredStates, "recoveredStates"), restoredCheckpointId + 1);
138+
this.currentProducer = startTransaction(restoredCheckpointId + 1);
139+
} catch (Throwable t) {
140+
try {
141+
close();
142+
} catch (Exception e) {
143+
t.addSuppressed(e);
144+
}
145+
throw t;
146+
}
137147
}
138148

139149
private FlinkKafkaInternalProducer<byte[], byte[]> startTransaction(long checkpointId) {

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ public void commit(Collection<CommitRequest<KafkaCommittable>> requests)
139139
"Transaction ({}) encountered error and data has been potentially lost.",
140140
request,
141141
e);
142+
closeCommitterProducer(producer);
142143
// cause failover
143144
request.signalFailedWithUnknownReason(e);
144145
}
@@ -150,6 +151,10 @@ private void handleFailedTransaction(FlinkKafkaInternalProducer<?, ?> producer)
150151
return;
151152
}
152153
backchannel.send(TransactionFinished.erroneously(producer.getTransactionalId()));
154+
closeCommitterProducer(producer);
155+
}
156+
157+
private void closeCommitterProducer(FlinkKafkaInternalProducer<?, ?> producer) {
153158
if (producer == this.committingProducer) {
154159
this.committingProducer.close();
155160
this.committingProducer = null;

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,18 @@ class KafkaWriter<IN>
148148
}
149149

150150
public void initialize() {
151-
this.currentProducer = new FlinkKafkaInternalProducer<>(this.kafkaProducerConfig);
152-
initKafkaMetrics(this.currentProducer);
151+
// Workaround for FLINK-37612: ensure that we are not leaking producers
152+
try {
153+
this.currentProducer = new FlinkKafkaInternalProducer<>(this.kafkaProducerConfig);
154+
initKafkaMetrics(this.currentProducer);
155+
} catch (Throwable t) {
156+
try {
157+
close();
158+
} catch (Exception e) {
159+
t.addSuppressed(e);
160+
}
161+
throw t;
162+
}
153163
}
154164

155165
@Override

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,13 @@
3434
import org.apache.kafka.common.errors.ProducerFencedException;
3535
import org.apache.kafka.common.serialization.StringDeserializer;
3636
import org.apache.kafka.common.serialization.StringSerializer;
37+
import org.assertj.core.api.AbstractThrowableAssert;
3738
import org.junit.jupiter.api.AfterEach;
3839
import org.junit.jupiter.api.Test;
3940
import org.junit.jupiter.api.extension.ExtendWith;
4041
import org.junit.jupiter.api.extension.RegisterExtension;
4142
import org.junit.jupiter.params.ParameterizedTest;
43+
import org.junit.jupiter.params.provider.CsvSource;
4244
import org.junit.jupiter.params.provider.MethodSource;
4345
import org.testcontainers.containers.KafkaContainer;
4446
import org.testcontainers.junit.jupiter.Container;
@@ -54,6 +56,7 @@
5456
import static org.apache.flink.connector.kafka.testutils.KafkaUtil.checkProducerLeak;
5557
import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;
5658
import static org.assertj.core.api.Assertions.assertThat;
59+
import static org.assertj.core.api.Assertions.assertThatCode;
5760
import static org.assertj.core.api.Assertions.assertThatThrownBy;
5861

5962
@Testcontainers
@@ -174,6 +177,51 @@ void testResetInnerTransactionIfFinalizingTransactionFailed(
174177
}
175178
}
176179

180+
@ParameterizedTest
181+
@CsvSource({"true,true", "true,false", "false,true", "false,false"})
182+
void testDoubleCommitAndAbort(boolean firstCommit, boolean secondCommit) {
183+
final String topic = "test-double-commit-transaction-" + firstCommit + secondCommit;
184+
final String transactionIdPrefix = "testDoubleCommitTransaction-";
185+
final String transactionalId = transactionIdPrefix + "id";
186+
187+
KafkaCommittable committable;
188+
try (FlinkKafkaInternalProducer<String, String> producer =
189+
new FlinkKafkaInternalProducer<>(getProperties(), transactionalId)) {
190+
producer.initTransactions();
191+
producer.beginTransaction();
192+
producer.send(new ProducerRecord<>(topic, "test-value"));
193+
producer.flush();
194+
committable = KafkaCommittable.of(producer);
195+
if (firstCommit) {
196+
producer.commitTransaction();
197+
} else {
198+
producer.abortTransaction();
199+
}
200+
}
201+
202+
try (FlinkKafkaInternalProducer<String, String> resumedProducer =
203+
new FlinkKafkaInternalProducer<>(getProperties(), transactionalId)) {
204+
resumedProducer.resumeTransaction(committable.getProducerId(), committable.getEpoch());
205+
AbstractThrowableAssert<?, ? extends Throwable> secondOp =
206+
assertThatCode(
207+
() -> {
208+
if (secondCommit) {
209+
resumedProducer.commitTransaction();
210+
} else {
211+
resumedProducer.abortTransaction();
212+
}
213+
});
214+
if (firstCommit == secondCommit) {
215+
secondOp.doesNotThrowAnyException();
216+
} else {
217+
secondOp.isInstanceOf(InvalidTxnStateException.class);
218+
}
219+
}
220+
221+
assertNumTransactions(1, transactionIdPrefix);
222+
assertThat(readRecords(topic).count()).isEqualTo(firstCommit ? 1 : 0);
223+
}
224+
177225
private static Properties getProperties() {
178226
Properties properties = new Properties();
179227
properties.put(

0 commit comments

Comments
 (0)