Skip to content

Commit ad3ef74

Browse files
committed
[FLINK-37356] Recreate producer on error
Reusing a producer in erroneous state can result in further issues that are outside the planned state machine. These known error cases can be found in KafkaCommitter and contain at this time: * ProducerFencedException - when a commit cannot be completed independent on how often we retry. * InvalidTxnStateException - was mostly caused by empty transactions in the past. Again we can just abandon this transactions. * UnknownProducerIdException - usually happening after recovery for 7 days where the metadata about the transaction has been deleted.
1 parent 1dc5c1a commit ad3ef74

File tree

11 files changed

+252
-52
lines changed

11 files changed

+252
-52
lines changed

flink-connector-kafka/archunit-violations/c0d94764-76a0-4c50-b617-70b1754c4612

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,12 @@ Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourc
2424
Method <org.apache.flink.connector.kafka.sink.ExactlyOnceKafkaWriter.getProducerPool()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (ExactlyOnceKafkaWriter.java:0)
2525
Method <org.apache.flink.connector.kafka.sink.ExactlyOnceKafkaWriter.getTransactionalIdPrefix()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (ExactlyOnceKafkaWriter.java:0)
2626
Method <org.apache.flink.connector.kafka.sink.KafkaCommitter.getBackchannel()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaCommitter.java:0)
27-
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getCoLocationGroupKey()> in (KafkaSink.java:168)
28-
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getInputs()> in (KafkaSink.java:171)
29-
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getOutputType()> in (KafkaSink.java:167)
30-
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.setCoLocationGroupKey(java.lang.String)> in (KafkaSink.java:170)
31-
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> checks instanceof <org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo> in (KafkaSink.java:167)
27+
Method <org.apache.flink.connector.kafka.sink.KafkaCommitter.getCommittingProducer()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaCommitter.java:0)
28+
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getCoLocationGroupKey()> in (KafkaSink.java:170)
29+
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getInputs()> in (KafkaSink.java:173)
30+
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getOutputType()> in (KafkaSink.java:169)
31+
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.setCoLocationGroupKey(java.lang.String)> in (KafkaSink.java:172)
32+
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> checks instanceof <org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo> in (KafkaSink.java:169)
3233
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> has generic parameter type <org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.streaming.api.connector.sink2.CommittableMessage<org.apache.flink.connector.kafka.sink.KafkaCommittable>>> with type argument depending on <org.apache.flink.streaming.api.connector.sink2.CommittableMessage> in (KafkaSink.java:0)
3334
Method <org.apache.flink.connector.kafka.sink.KafkaSink.getKafkaProducerConfig()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSink.java:0)
3435
Method <org.apache.flink.connector.kafka.sink.KafkaSinkBuilder.setRecordSerializer(org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema)> calls method <org.apache.flink.api.java.ClosureCleaner.clean(java.lang.Object, org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel, boolean)> in (KafkaSinkBuilder.java:152)
@@ -51,4 +52,4 @@ Method <org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsU
5152
Method <org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createValueFormatProjection(org.apache.flink.configuration.ReadableConfig, org.apache.flink.table.types.DataType)> calls method <org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldCount(org.apache.flink.table.types.logical.LogicalType)> in (KafkaConnectorOptionsUtil.java:569)
5253
Method <org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink.createSerialization(org.apache.flink.table.connector.sink.DynamicTableSink$Context, org.apache.flink.table.connector.format.EncodingFormat, [I, java.lang.String)> calls method <org.apache.flink.table.types.utils.DataTypeUtils.stripRowPrefix(org.apache.flink.table.types.DataType, java.lang.String)> in (KafkaDynamicSink.java:401)
5354
Method <org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink.getFieldGetters(java.util.List, [I)> has return type <[Lorg.apache.flink.table.data.RowData$FieldGetter;> in (KafkaDynamicSink.java:0)
54-
Method <org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createDeserialization(org.apache.flink.table.connector.source.DynamicTableSource$Context, org.apache.flink.table.connector.format.DecodingFormat, [I, java.lang.String)> calls method <org.apache.flink.table.types.utils.DataTypeUtils.stripRowPrefix(org.apache.flink.table.types.DataType, java.lang.String)> in (KafkaDynamicSource.java:566)
55+
Method <org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createDeserialization(org.apache.flink.table.connector.source.DynamicTableSource$Context, org.apache.flink.table.connector.format.DecodingFormat, [I, java.lang.String)> calls method <org.apache.flink.table.types.utils.DataTypeUtils.stripRowPrefix(org.apache.flink.table.types.DataType, java.lang.String)> in (KafkaDynamicSource.java:580)

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.flink.connector.kafka.sink.internal.ProducerPool;
2727
import org.apache.flink.connector.kafka.sink.internal.ProducerPoolImpl;
2828
import org.apache.flink.connector.kafka.sink.internal.ReadableBackchannel;
29+
import org.apache.flink.connector.kafka.sink.internal.TransactionFinished;
2930
import org.apache.flink.connector.kafka.sink.internal.TransactionalIdFactory;
3031
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
3132
import org.apache.flink.util.FlinkRuntimeException;
@@ -70,7 +71,7 @@ class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> {
7071
* Establishing the channel happens during recovery. Thus, it is only safe to poll in checkpoint
7172
* related methods.
7273
*/
73-
private final ReadableBackchannel<String> backchannel;
74+
private final ReadableBackchannel<TransactionFinished> backchannel;
7475

7576
/**
7677
* Constructor creating a kafka writer.
@@ -164,9 +165,10 @@ public Collection<KafkaCommittable> prepareCommit() {
164165
@Override
165166
public List<KafkaWriterState> snapshotState(long checkpointId) throws IOException {
166167
// recycle committed producers
167-
String finishedTransactionalId;
168-
while ((finishedTransactionalId = backchannel.poll()) != null) {
169-
producerPool.recycleByTransactionId(finishedTransactionalId);
168+
TransactionFinished finishedTransaction;
169+
while ((finishedTransaction = backchannel.poll()) != null) {
170+
producerPool.recycleByTransactionId(
171+
finishedTransaction.getTransactionId(), finishedTransaction.isSuccess());
170172
}
171173
currentProducer = startTransaction(checkpointId + 1);
172174
return Collections.singletonList(kafkaWriterState);

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.flink.api.connector.sink2.Committer;
2222
import org.apache.flink.connector.kafka.sink.internal.BackchannelFactory;
2323
import org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer;
24+
import org.apache.flink.connector.kafka.sink.internal.TransactionFinished;
2425
import org.apache.flink.connector.kafka.sink.internal.WritableBackchannel;
2526
import org.apache.flink.util.ExceptionUtils;
2627
import org.apache.flink.util.IOUtils;
@@ -40,6 +41,7 @@
4041
import java.util.Collection;
4142
import java.util.Optional;
4243
import java.util.Properties;
44+
import java.util.function.BiFunction;
4345

4446
/**
4547
* Committer implementation for {@link KafkaSink}
@@ -54,25 +56,34 @@ class KafkaCommitter implements Committer<KafkaCommittable>, Closeable {
5456
+ "To avoid data loss, the application will restart.";
5557

5658
private final Properties kafkaProducerConfig;
57-
private final WritableBackchannel<String> backchannel;
59+
private final BiFunction<Properties, String, FlinkKafkaInternalProducer<?, ?>> producerFactory;
60+
private final WritableBackchannel<TransactionFinished> backchannel;
5861
@Nullable private FlinkKafkaInternalProducer<?, ?> committingProducer;
5962

6063
KafkaCommitter(
6164
Properties kafkaProducerConfig,
6265
String transactionalIdPrefix,
6366
int subtaskId,
64-
int attemptNumber) {
67+
int attemptNumber,
68+
BiFunction<Properties, String, FlinkKafkaInternalProducer<?, ?>> producerFactory) {
6569
this.kafkaProducerConfig = kafkaProducerConfig;
70+
this.producerFactory = producerFactory;
6671
backchannel =
6772
BackchannelFactory.getInstance()
6873
.getWritableBackchannel(subtaskId, attemptNumber, transactionalIdPrefix);
6974
}
7075

7176
@VisibleForTesting
72-
public WritableBackchannel<String> getBackchannel() {
77+
public WritableBackchannel<TransactionFinished> getBackchannel() {
7378
return backchannel;
7479
}
7580

81+
@Nullable
82+
@VisibleForTesting
83+
FlinkKafkaInternalProducer<?, ?> getCommittingProducer() {
84+
return committingProducer;
85+
}
86+
7687
@Override
7788
public void commit(Collection<CommitRequest<KafkaCommittable>> requests)
7889
throws IOException, InterruptedException {
@@ -81,11 +92,11 @@ public void commit(Collection<CommitRequest<KafkaCommittable>> requests)
8192
final String transactionalId = committable.getTransactionalId();
8293
LOG.debug("Committing Kafka transaction {}", transactionalId);
8394
Optional<FlinkKafkaInternalProducer<?, ?>> writerProducer = committable.getProducer();
84-
FlinkKafkaInternalProducer<?, ?> producer;
95+
FlinkKafkaInternalProducer<?, ?> producer = null;
8596
try {
8697
producer = writerProducer.orElseGet(() -> getProducer(committable));
8798
producer.commitTransaction();
88-
backchannel.send(committable.getTransactionalId());
99+
backchannel.send(TransactionFinished.successful(committable.getTransactionalId()));
89100
} catch (RetriableException e) {
90101
LOG.warn(
91102
"Encountered retriable exception while committing {}.", transactionalId, e);
@@ -104,7 +115,7 @@ public void commit(Collection<CommitRequest<KafkaCommittable>> requests)
104115
ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
105116
kafkaProducerConfig.getProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG),
106117
e);
107-
backchannel.send(committable.getTransactionalId());
118+
handleFailedTransaction(producer);
108119
request.signalFailedWithKnownReason(e);
109120
} catch (InvalidTxnStateException e) {
110121
// This exception only occurs when aborting after a commit or vice versa.
@@ -114,14 +125,14 @@ public void commit(Collection<CommitRequest<KafkaCommittable>> requests)
114125
+ "Most likely the transaction has been aborted for some reason. Please check the Kafka logs for more details.",
115126
request,
116127
e);
117-
backchannel.send(committable.getTransactionalId());
128+
handleFailedTransaction(producer);
118129
request.signalFailedWithKnownReason(e);
119130
} catch (UnknownProducerIdException e) {
120131
LOG.error(
121132
"Unable to commit transaction ({}) " + UNKNOWN_PRODUCER_ID_ERROR_MESSAGE,
122133
request,
123134
e);
124-
backchannel.send(committable.getTransactionalId());
135+
handleFailedTransaction(producer);
125136
request.signalFailedWithKnownReason(e);
126137
} catch (Exception e) {
127138
LOG.error(
@@ -134,6 +145,17 @@ public void commit(Collection<CommitRequest<KafkaCommittable>> requests)
134145
}
135146
}
136147

148+
private void handleFailedTransaction(FlinkKafkaInternalProducer<?, ?> producer) {
149+
if (producer == null) {
150+
return;
151+
}
152+
backchannel.send(TransactionFinished.erroneously(producer.getTransactionalId()));
153+
if (producer == this.committingProducer) {
154+
this.committingProducer.close();
155+
this.committingProducer = null;
156+
}
157+
}
158+
137159
@Override
138160
public void close() throws IOException {
139161
try {
@@ -150,8 +172,7 @@ public void close() throws IOException {
150172
private FlinkKafkaInternalProducer<?, ?> getProducer(KafkaCommittable committable) {
151173
if (committingProducer == null) {
152174
committingProducer =
153-
new FlinkKafkaInternalProducer<>(
154-
kafkaProducerConfig, committable.getTransactionalId());
175+
producerFactory.apply(kafkaProducerConfig, committable.getTransactionalId());
155176
} else {
156177
committingProducer.setTransactionId(committable.getTransactionalId());
157178
}

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.flink.connector.kafka.lineage.LineageUtil;
3131
import org.apache.flink.connector.kafka.lineage.TypeDatasetFacet;
3232
import org.apache.flink.connector.kafka.lineage.TypeDatasetFacetProvider;
33+
import org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer;
3334
import org.apache.flink.core.io.SimpleVersionedSerializer;
3435
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
3536
import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
@@ -110,7 +111,8 @@ public Committer<KafkaCommittable> createCommitter(CommitterInitContext context)
110111
kafkaProducerConfig,
111112
transactionalIdPrefix,
112113
context.getTaskInfo().getIndexOfThisSubtask(),
113-
context.getTaskInfo().getAttemptNumber());
114+
context.getTaskInfo().getAttemptNumber(),
115+
FlinkKafkaInternalProducer::new);
114116
}
115117

116118
@Internal

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ProducerPool.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,13 @@ public interface ProducerPool extends AutoCloseable {
2424
* Notify the pool that a transaction has finished. The producer with the given transactional id
2525
* can be recycled.
2626
*/
27-
void recycleByTransactionId(String transactionalId);
27+
void recycleByTransactionId(String transactionalId, boolean success);
2828

2929
/**
3030
* Get a producer for the given transactional id and checkpoint id. The producer is not recycled
3131
* until it is passed to the committer, the committer commits the transaction, and {@link
32-
* #recycleByTransactionId(String)} is called. Alternatively, the producer can be recycled by
33-
* {@link #recycle(FlinkKafkaInternalProducer)}.
32+
* #recycleByTransactionId(String, boolean)} is called. Alternatively, the producer can be
33+
* recycled by {@link #recycle(FlinkKafkaInternalProducer)}.
3434
*/
3535
FlinkKafkaInternalProducer<byte[], byte[]> getTransactionalProducer(
3636
String transactionalId, long checkpointId);

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImpl.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.Deque;
3333
import java.util.Map;
3434
import java.util.NavigableMap;
35+
import java.util.Objects;
3536
import java.util.Properties;
3637
import java.util.TreeMap;
3738
import java.util.function.Consumer;
@@ -105,17 +106,22 @@ public ProducerPoolImpl(
105106
}
106107

107108
@Override
108-
public void recycleByTransactionId(String transactionalId) {
109+
public void recycleByTransactionId(String transactionalId, boolean success) {
109110
ProducerEntry producerEntry = producerByTransactionalId.remove(transactionalId);
110111
LOG.debug("Transaction {} finished, producer {}", transactionalId, producerEntry);
111112
if (producerEntry == null) {
112113
// during recovery, the committer may finish transactions that are not yet ongoing from
113114
// the writer's perspective
115+
// these transaction will be closed by the second half of this method eventually
114116
return;
115117
}
116118

117119
transactionalIdsByCheckpoint.remove(producerEntry.getCheckpointedTransaction());
118-
recycleProducer(producerEntry.getProducer());
120+
if (success) {
121+
recycleProducer(producerEntry.getProducer());
122+
} else {
123+
closeProducer(producerEntry.getProducer());
124+
}
119125

120126
// In rare cases (only for non-chained committer), some transactions may not be detected to
121127
// be finished.
@@ -131,12 +137,18 @@ public void recycleByTransactionId(String transactionalId) {
131137
if (!earlierTransactions.isEmpty()) {
132138
for (String id : earlierTransactions.values()) {
133139
ProducerEntry entry = producerByTransactionalId.remove(id);
134-
recycleProducer(entry.getProducer());
140+
closeProducer(entry.getProducer());
135141
}
136142
earlierTransactions.clear();
137143
}
138144
}
139145

146+
private void closeProducer(@Nullable FlinkKafkaInternalProducer<byte[], byte[]> producer) {
147+
if (producer != null) {
148+
producer.close();
149+
}
150+
}
151+
140152
@Override
141153
public void recycle(FlinkKafkaInternalProducer<byte[], byte[]> producer) {
142154
recycleProducer(producer);
@@ -151,6 +163,7 @@ private void recycleProducer(@Nullable FlinkKafkaInternalProducer<byte[], byte[]
151163
if (producer == null) {
152164
return;
153165
}
166+
154167
// For non-chained committer, we have a split brain scenario:
155168
// Both the writer and the committer have a producer representing the same transaction.
156169
// The committer producer has finished the transaction while the writer producer is still in
@@ -212,6 +225,7 @@ public void close() throws Exception {
212225
closeAll(
213226
producerByTransactionalId.values().stream()
214227
.map(ProducerEntry::getProducer)
228+
.filter(Objects::nonNull)
215229
.collect(Collectors.toList())),
216230
producerPool::clear,
217231
producerByTransactionalId::clear);

0 commit comments

Comments
 (0)