Skip to content

Commit 2d4f402

Browse files
committed
[FLINK-34554] Adding pooling name strategy
Reduce the load on Kafka broker by reusing transaction ids as soon as they have been committed.
1 parent fb28015 commit 2d4f402

File tree

18 files changed

+950
-104
lines changed

18 files changed

+950
-104
lines changed

flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.flink.tests.util.kafka;
2020

21+
import org.apache.flink.connector.kafka.sink.TransactionNamingStrategy;
2122
import org.apache.flink.connector.kafka.sink.testutils.KafkaSinkExternalContextFactory;
2223
import org.apache.flink.connector.kafka.testutils.DockerImageVersions;
2324
import org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment;
@@ -62,7 +63,7 @@ public class KafkaSinkE2ECase extends SinkTestSuiteBase<String> {
6263
// Defines 2 External context Factories, so test cases will be invoked twice using these two
6364
// kinds of external contexts.
6465
@TestContext
65-
KafkaSinkExternalContextFactory contextFactory =
66+
KafkaSinkExternalContextFactory incrementing =
6667
new KafkaSinkExternalContextFactory(
6768
kafka.getContainer(),
6869
Arrays.asList(
@@ -77,7 +78,27 @@ public class KafkaSinkE2ECase extends SinkTestSuiteBase<String> {
7778
ResourceTestUtils.getResource("flink-connector-testing.jar")
7879
.toAbsolutePath()
7980
.toUri()
80-
.toURL()));
81+
.toURL()),
82+
TransactionNamingStrategy.INCREMENTING);
83+
84+
@TestContext
85+
KafkaSinkExternalContextFactory pooling =
86+
new KafkaSinkExternalContextFactory(
87+
kafka.getContainer(),
88+
Arrays.asList(
89+
ResourceTestUtils.getResource("kafka-connector.jar")
90+
.toAbsolutePath()
91+
.toUri()
92+
.toURL(),
93+
ResourceTestUtils.getResource("kafka-clients.jar")
94+
.toAbsolutePath()
95+
.toUri()
96+
.toURL(),
97+
ResourceTestUtils.getResource("flink-connector-testing.jar")
98+
.toAbsolutePath()
99+
.toUri()
100+
.toURL()),
101+
TransactionNamingStrategy.POOLING);
81102

82103
public KafkaSinkE2ECase() throws Exception {}
83104
}

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,10 @@ public void initialize() {
200200

201201
private FlinkKafkaInternalProducer<byte[], byte[]> startTransaction(long checkpointId) {
202202
namingContext.setNextCheckpointId(checkpointId);
203+
namingContext.setOngoingTransactions(
204+
producerPool.getOngoingTransactions().stream()
205+
.map(CheckpointTransaction::getTransactionalId)
206+
.collect(Collectors.toSet()));
203207
FlinkKafkaInternalProducer<byte[], byte[]> producer =
204208
transactionNamingStrategy.getTransactionalProducer(namingContext);
205209
namingContext.setLastCheckpointId(checkpointId);

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java

Lines changed: 38 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ class KafkaCommitter implements Committer<KafkaCommittable>, Closeable {
5656
+ "To avoid data loss, the application will restart.";
5757

5858
private final Properties kafkaProducerConfig;
59+
private final boolean reusesTransactionalIds;
5960
private final BiFunction<Properties, String, FlinkKafkaInternalProducer<?, ?>> producerFactory;
6061
private final WritableBackchannel<TransactionFinished> backchannel;
6162
@Nullable private FlinkKafkaInternalProducer<?, ?> committingProducer;
@@ -65,8 +66,10 @@ class KafkaCommitter implements Committer<KafkaCommittable>, Closeable {
6566
String transactionalIdPrefix,
6667
int subtaskId,
6768
int attemptNumber,
69+
boolean reusesTransactionalIds,
6870
BiFunction<Properties, String, FlinkKafkaInternalProducer<?, ?>> producerFactory) {
6971
this.kafkaProducerConfig = kafkaProducerConfig;
72+
this.reusesTransactionalIds = reusesTransactionalIds;
7073
this.producerFactory = producerFactory;
7174
backchannel =
7275
BackchannelFactory.getInstance()
@@ -102,19 +105,7 @@ public void commit(Collection<CommitRequest<KafkaCommittable>> requests)
102105
"Encountered retriable exception while committing {}.", transactionalId, e);
103106
request.retryLater();
104107
} catch (ProducerFencedException e) {
105-
// initTransaction has been called on this transaction before
106-
LOG.error(
107-
"Unable to commit transaction ({}) because its producer is already fenced."
108-
+ " This means that you either have a different producer with the same '{}' (this is"
109-
+ " unlikely with the '{}' as all generated ids are unique and shouldn't be reused)"
110-
+ " or recovery took longer than '{}' ({}ms). In both cases this most likely signals data loss,"
111-
+ " please consult the Flink documentation for more details.",
112-
request,
113-
ProducerConfig.TRANSACTIONAL_ID_CONFIG,
114-
KafkaSink.class.getSimpleName(),
115-
ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
116-
kafkaProducerConfig.getProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG),
117-
e);
108+
logFencedRequest(request, e);
118109
handleFailedTransaction(producer);
119110
request.signalFailedWithKnownReason(e);
120111
} catch (InvalidTxnStateException e) {
@@ -146,6 +137,40 @@ public void commit(Collection<CommitRequest<KafkaCommittable>> requests)
146137
}
147138
}
148139

140+
private void logFencedRequest(
141+
CommitRequest<KafkaCommittable> request, ProducerFencedException e) {
142+
if (reusesTransactionalIds) {
143+
// If checkpoint 1 succeeds, checkpoint 2 is aborted, and checkpoint 3 may reuse the id
144+
// of checkpoint 1. A recovery of checkpoint 1 would show that the transaction has been
145+
// fenced.
146+
LOG.warn(
147+
"Unable to commit transaction ({}) because its producer is already fenced."
148+
+ " If this warning appears as part of the recovery of a checkpoint, it is expected in some cases (e.g., aborted checkpoints in previous attempt)."
149+
+ " If it's outside of recovery, this means that you either have a different sink with the same '{}'"
150+
+ " or recovery took longer than '{}' ({}ms). In both cases this most likely signals data loss,"
151+
+ " please consult the Flink documentation for more details.",
152+
request,
153+
ProducerConfig.TRANSACTIONAL_ID_CONFIG,
154+
ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
155+
kafkaProducerConfig.getProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG),
156+
e);
157+
} else {
158+
// initTransaction has been called on this transaction before
159+
LOG.error(
160+
"Unable to commit transaction ({}) because its producer is already fenced."
161+
+ " This means that you either have a different producer with the same '{}' (this is"
162+
+ " unlikely with the '{}' as all generated ids are unique and shouldn't be reused)"
163+
+ " or recovery took longer than '{}' ({}ms). In both cases this most likely signals data loss,"
164+
+ " please consult the Flink documentation for more details.",
165+
request,
166+
ProducerConfig.TRANSACTIONAL_ID_CONFIG,
167+
KafkaSink.class.getSimpleName(),
168+
ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
169+
kafkaProducerConfig.getProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG),
170+
e);
171+
}
172+
}
173+
149174
private void handleFailedTransaction(FlinkKafkaInternalProducer<?, ?> producer) {
150175
if (producer == null) {
151176
return;

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ public Committer<KafkaCommittable> createCommitter(CommitterInitContext context)
115115
transactionalIdPrefix,
116116
context.getTaskInfo().getIndexOfThisSubtask(),
117117
context.getTaskInfo().getAttemptNumber(),
118+
transactionNamingStrategy == TransactionNamingStrategy.POOLING,
118119
FlinkKafkaInternalProducer::new);
119120
}
120121

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.flink.api.common.ExecutionConfig;
2222
import org.apache.flink.api.java.ClosureCleaner;
2323
import org.apache.flink.connector.base.DeliveryGuarantee;
24+
import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider;
2425

2526
import org.apache.kafka.clients.producer.ProducerConfig;
2627
import org.apache.kafka.common.serialization.ByteArraySerializer;
@@ -200,6 +201,12 @@ private void sanityCheck() {
200201
checkState(
201202
transactionalIdPrefix != null,
202203
"EXACTLY_ONCE delivery guarantee requires a transactionalIdPrefix to be set to provide unique transaction names across multiple KafkaSinks writing to the same Kafka cluster.");
204+
if (transactionNamingStrategy.getImpl().requiresKnownTopics()) {
205+
checkState(
206+
recordSerializer instanceof KafkaDatasetFacetProvider,
207+
"For %s naming strategy, the recordSerializer needs to expose the target topics though implementing KafkaDatasetFacetProvider.",
208+
transactionNamingStrategy);
209+
}
203210
}
204211
}
205212

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionNamingStrategy.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,24 @@ public enum TransactionNamingStrategy {
3939
* on the broker.
4040
*
4141
* <p>This is exactly the same behavior as in flink-connector-kafka 3.X.
42+
*
43+
* <p>Switching to this strategy from {@link #POOLING} is not supported.
44+
*/
45+
INCREMENTING(TransactionNamingStrategyImpl.INCREMENTING, TransactionAbortStrategyImpl.PROBING),
46+
47+
/**
48+
* This strategy reuses transaction names. It is more resource-friendly than {@link
49+
* #INCREMENTING} on the Kafka broker.
50+
*
51+
* <p>It's a new strategy introduced in flink-connector-kafka 4.X. It requires Kafka 3.0+ and
52+
* additional read permissions on the target topics.
53+
*
54+
* <p>The recommended way to switch to this strategy is to first take a checkpoint with
55+
* flink-connector-kafka 4.X and then switch to this strategy. This will ensure that no
56+
* transactions are left open from the previous run. Alternatively, you can use a savepoint from
57+
* any version.
4258
*/
43-
INCREMENTING(TransactionNamingStrategyImpl.INCREMENTING, TransactionAbortStrategyImpl.PROBING);
59+
POOLING(TransactionNamingStrategyImpl.POOLING, TransactionAbortStrategyImpl.LISTING);
4460

4561
/**
4662
* The default transaction naming strategy. Currently set to {@link #INCREMENTING}, which is the

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionNamingStrategyContextImpl.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,17 @@
2020

2121
import org.apache.flink.annotation.Internal;
2222

23+
import java.util.Collection;
24+
import java.util.Set;
25+
2326
import static org.apache.flink.util.Preconditions.checkNotNull;
2427

2528
/** Implementation of {@link TransactionNamingStrategyImpl.Context}. */
2629
@Internal
2730
public class TransactionNamingStrategyContextImpl implements TransactionNamingStrategyImpl.Context {
2831
private final String transactionalIdPrefix;
2932
private final int subtaskId;
33+
private Set<String> ongoingTransactions;
3034
private final ProducerPool producerPool;
3135
private long lastCheckpointId;
3236
private long nextCheckpointId;
@@ -63,6 +67,15 @@ public void setLastCheckpointId(long lastCheckpointId) {
6367
this.lastCheckpointId = lastCheckpointId;
6468
}
6569

70+
@Override
71+
public Set<String> getOngoingTransactions() {
72+
return ongoingTransactions;
73+
}
74+
75+
public void setOngoingTransactions(Collection<String> ongoingTransactions) {
76+
this.ongoingTransactions = Set.copyOf(ongoingTransactions);
77+
}
78+
6679
@Override
6780
public long getLastCheckpointId() {
6881
return lastCheckpointId;

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionNamingStrategyImpl.java

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,14 @@
2121
import org.apache.flink.annotation.Internal;
2222
import org.apache.flink.connector.kafka.sink.TransactionNamingStrategy;
2323

24+
import java.util.Set;
25+
2426
import static org.apache.flink.util.Preconditions.checkState;
2527

2628
/** Implementation of {@link TransactionNamingStrategy}. */
2729
@Internal
2830
public enum TransactionNamingStrategyImpl {
29-
INCREMENTING(TransactionOwnership.IMPLICIT_BY_SUBTASK_ID) {
31+
INCREMENTING(TransactionOwnership.IMPLICIT_BY_SUBTASK_ID, false) {
3032
/**
3133
* For each checkpoint we create new {@link FlinkKafkaInternalProducer} so that new
3234
* transactions will not clash with transactions created during previous checkpoints ({@code
@@ -55,12 +57,36 @@ public FlinkKafkaInternalProducer<byte[], byte[]> getTransactionalProducer(
5557
}
5658
return context.getProducer(context.buildTransactionalId(expectedCheckpointId));
5759
}
60+
},
61+
POOLING(TransactionOwnership.EXPLICIT_BY_WRITER_STATE, true) {
62+
@Override
63+
public FlinkKafkaInternalProducer<byte[], byte[]> getTransactionalProducer(
64+
Context context) {
65+
Set<String> usedTransactionalIds = context.getOngoingTransactions();
66+
for (int offset = 0; ; offset++) {
67+
String transactionalIdCandidate = context.buildTransactionalId(offset);
68+
if (usedTransactionalIds.contains(transactionalIdCandidate)) {
69+
continue;
70+
}
71+
return context.getProducer(transactionalIdCandidate);
72+
}
73+
}
5874
};
5975

6076
private final TransactionOwnership ownership;
77+
private final boolean requiresKnownTopics;
6178

62-
TransactionNamingStrategyImpl(TransactionOwnership ownership) {
79+
TransactionNamingStrategyImpl(TransactionOwnership ownership, boolean requiresKnownTopics) {
6380
this.ownership = ownership;
81+
this.requiresKnownTopics = requiresKnownTopics;
82+
}
83+
84+
public boolean requiresKnownTopics() {
85+
return requiresKnownTopics;
86+
}
87+
88+
public TransactionOwnership getOwnership() {
89+
return ownership;
6490
}
6591

6692
/**
@@ -70,16 +96,14 @@ public FlinkKafkaInternalProducer<byte[], byte[]> getTransactionalProducer(
7096
public abstract FlinkKafkaInternalProducer<byte[], byte[]> getTransactionalProducer(
7197
Context context);
7298

73-
public TransactionOwnership getOwnership() {
74-
return ownership;
75-
}
76-
7799
/** Context for the transaction naming strategy. */
78100
public interface Context {
79101
String buildTransactionalId(long offset);
80102

81103
long getNextCheckpointId();
82104

105+
Set<String> getOngoingTransactions();
106+
83107
long getLastCheckpointId();
84108

85109
FlinkKafkaInternalProducer<byte[], byte[]> getProducer(String transactionalId);

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionalIdFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,11 @@ public static String buildTransactionalId(
4444
+ checkpointOffset;
4545
}
4646

47-
public static long extractSubtaskId(String name) {
47+
public static int extractSubtaskId(String name) {
4848
int lastSep = name.lastIndexOf("-");
4949
int secondLastSep = name.lastIndexOf("-", lastSep - 1);
5050
String subtaskString = name.substring(secondLastSep + 1, lastSep);
51-
return Long.parseLong(subtaskString);
51+
return Integer.parseInt(subtaskString);
5252
}
5353

5454
public static String extractPrefix(String name) {

0 commit comments

Comments
 (0)