Skip to content

Commit d43fcbd

Browse files
committed
[FLINK-34554] Introduce transaction strategies
Move existing way of naming and aborting transactions into specific strategies. Next commit will add new strategies.
1 parent 45df794 commit d43fcbd

17 files changed

+586
-165
lines changed

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java

Lines changed: 58 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,11 @@
2626
import org.apache.flink.connector.kafka.sink.internal.ProducerPool;
2727
import org.apache.flink.connector.kafka.sink.internal.ProducerPoolImpl;
2828
import org.apache.flink.connector.kafka.sink.internal.ReadableBackchannel;
29+
import org.apache.flink.connector.kafka.sink.internal.TransactionAbortStrategyContextImpl;
30+
import org.apache.flink.connector.kafka.sink.internal.TransactionAbortStrategyImpl;
2931
import org.apache.flink.connector.kafka.sink.internal.TransactionFinished;
30-
import org.apache.flink.connector.kafka.sink.internal.TransactionalIdFactory;
32+
import org.apache.flink.connector.kafka.sink.internal.TransactionNamingStrategyContextImpl;
33+
import org.apache.flink.connector.kafka.sink.internal.TransactionNamingStrategyImpl;
3134
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
3235
import org.apache.flink.util.FlinkRuntimeException;
3336

@@ -54,7 +57,17 @@
5457
*/
5558
class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> {
5659
private static final Logger LOG = LoggerFactory.getLogger(ExactlyOnceKafkaWriter.class);
60+
/**
61+
* Prefix for the transactional id. Must be unique across all sinks writing to the same broker.
62+
*/
5763
private final String transactionalIdPrefix;
64+
/**
65+
* Strategy to abort lingering transactions from previous executions during writer
66+
* initialization.
67+
*/
68+
private final TransactionAbortStrategyImpl transactionAbortStrategy;
69+
/** Strategy to name transactions. */
70+
private final TransactionNamingStrategyImpl transactionNamingStrategy;
5871

5972
private final KafkaWriterState kafkaWriterState;
6073
private final Collection<KafkaWriterState> recoveredStates;
@@ -72,6 +85,8 @@ class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> {
7285
* related methods.
7386
*/
7487
private final ReadableBackchannel<TransactionFinished> backchannel;
88+
/** The context used to name transactions. */
89+
private final TransactionNamingStrategyContextImpl namingContext;
7590

7691
/**
7792
* Constructor creating a kafka writer.
@@ -95,6 +110,8 @@ class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> {
95110
WriterInitContext sinkInitContext,
96111
KafkaRecordSerializationSchema<IN> recordSerializer,
97112
SerializationSchema.InitializationContext schemaContext,
113+
TransactionAbortStrategyImpl transactionAbortStrategy,
114+
TransactionNamingStrategyImpl transactionNamingStrategy,
98115
Collection<KafkaWriterState> recoveredStates) {
99116
super(
100117
deliveryGuarantee,
@@ -104,6 +121,11 @@ class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> {
104121
schemaContext);
105122
this.transactionalIdPrefix =
106123
checkNotNull(transactionalIdPrefix, "transactionalIdPrefix must not be null");
124+
this.transactionAbortStrategy =
125+
checkNotNull(transactionAbortStrategy, "transactionAbortStrategy must not be null");
126+
this.transactionNamingStrategy =
127+
checkNotNull(
128+
transactionNamingStrategy, "transactionNamingStrategy must not be null");
107129

108130
try {
109131
recordSerializer.open(schemaContext, kafkaSinkContext);
@@ -127,6 +149,9 @@ class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> {
127149
subtaskId,
128150
sinkInitContext.getTaskInfo().getAttemptNumber(),
129151
transactionalIdPrefix);
152+
this.namingContext =
153+
new TransactionNamingStrategyContextImpl(
154+
transactionalIdPrefix, subtaskId, restoredCheckpointId, producerPool);
130155
}
131156

132157
@Override
@@ -147,13 +172,10 @@ public void initialize() {
147172
}
148173

149174
private FlinkKafkaInternalProducer<byte[], byte[]> startTransaction(long checkpointId) {
175+
namingContext.setNextCheckpointId(checkpointId);
150176
FlinkKafkaInternalProducer<byte[], byte[]> producer =
151-
producerPool.getTransactionalProducer(
152-
TransactionalIdFactory.buildTransactionalId(
153-
transactionalIdPrefix,
154-
kafkaSinkContext.getParallelInstanceId(),
155-
checkpointId),
156-
checkpointId);
177+
transactionNamingStrategy.getTransactionalProducer(namingContext);
178+
namingContext.setLastCheckpointId(checkpointId);
157179
producer.beginTransaction();
158180
return producer;
159181
}
@@ -236,13 +258,34 @@ private void abortLingeringTransactions(
236258
}
237259
}
238260

239-
try (TransactionAborter transactionAborter =
240-
new TransactionAborter(
241-
kafkaSinkContext.getParallelInstanceId(),
242-
kafkaSinkContext.getNumberOfParallelInstances(),
243-
id -> producerPool.getTransactionalProducer(id, startCheckpointId),
244-
producerPool::recycle)) {
245-
transactionAborter.abortLingeringTransactions(prefixesToAbort, startCheckpointId);
246-
}
261+
LOG.info(
262+
"Aborting lingering transactions with prefixes {} using {}",
263+
prefixesToAbort,
264+
transactionAbortStrategy);
265+
TransactionAbortStrategyContextImpl context =
266+
getTransactionAbortStrategyContext(startCheckpointId, prefixesToAbort);
267+
transactionAbortStrategy.abortTransactions(context);
268+
}
269+
270+
private TransactionAbortStrategyContextImpl getTransactionAbortStrategyContext(
271+
long startCheckpointId, List<String> prefixesToAbort) {
272+
TransactionAbortStrategyImpl.TransactionAborter aborter =
273+
transactionalId -> {
274+
// getTransactionalProducer already calls initTransactions, which cancels the
275+
// transaction
276+
FlinkKafkaInternalProducer<byte[], byte[]> producer =
277+
producerPool.getTransactionalProducer(transactionalId, 0);
278+
LOG.debug("Aborting transaction {}", transactionalId);
279+
producer.flush();
280+
short epoch = producer.getEpoch();
281+
producerPool.recycle(producer);
282+
return epoch;
283+
};
284+
return new TransactionAbortStrategyContextImpl(
285+
kafkaSinkContext.getParallelInstanceId(),
286+
kafkaSinkContext.getNumberOfParallelInstances(),
287+
prefixesToAbort,
288+
startCheckpointId,
289+
aborter);
247290
}
248291
}

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,16 +82,19 @@ public class KafkaSink<IN>
8282
private final KafkaRecordSerializationSchema<IN> recordSerializer;
8383
private final Properties kafkaProducerConfig;
8484
private final String transactionalIdPrefix;
85+
private final TransactionNamingStrategy transactionNamingStrategy;
8586

8687
KafkaSink(
8788
DeliveryGuarantee deliveryGuarantee,
8889
Properties kafkaProducerConfig,
8990
String transactionalIdPrefix,
90-
KafkaRecordSerializationSchema<IN> recordSerializer) {
91+
KafkaRecordSerializationSchema<IN> recordSerializer,
92+
TransactionNamingStrategy transactionNamingStrategy) {
9193
this.deliveryGuarantee = deliveryGuarantee;
9294
this.kafkaProducerConfig = kafkaProducerConfig;
9395
this.transactionalIdPrefix = transactionalIdPrefix;
9496
this.recordSerializer = recordSerializer;
97+
this.transactionNamingStrategy = transactionNamingStrategy;
9598
}
9699

97100
/**
@@ -141,6 +144,8 @@ public KafkaWriter<IN> restoreWriter(
141144
context,
142145
recordSerializer,
143146
context.asSerializationSchemaInitializationContext(),
147+
transactionNamingStrategy.getAbortImpl(),
148+
transactionNamingStrategy.getImpl(),
144149
recoveredState);
145150
} else {
146151
writer =

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ public class KafkaSinkBuilder<IN> {
7474

7575
private final Properties kafkaProducerConfig;
7676
private KafkaRecordSerializationSchema<IN> recordSerializer;
77+
private TransactionNamingStrategy transactionNamingStrategy = TransactionNamingStrategy.DEFAULT;
7778

7879
KafkaSinkBuilder() {
7980
kafkaProducerConfig = new Properties();
@@ -125,6 +126,20 @@ public KafkaSinkBuilder<IN> setProperty(String key, String value) {
125126
return this;
126127
}
127128

129+
/**
130+
* Sets the {@link TransactionNamingStrategy} that is used to name the transactions.
131+
*
132+
* <p>By default {@link TransactionNamingStrategy#DEFAULT} is used. It's recommended to change
133+
* the strategy only if specific issues occur.
134+
*/
135+
public KafkaSinkBuilder<IN> setTransactionNamingStrategy(
136+
TransactionNamingStrategy transactionNamingStrategy) {
137+
this.transactionNamingStrategy =
138+
checkNotNull(
139+
transactionNamingStrategy, "transactionNamingStrategy must not be null");
140+
return this;
141+
}
142+
128143
/**
129144
* Sets the {@link KafkaRecordSerializationSchema} that transforms incoming records to {@link
130145
* org.apache.kafka.clients.producer.ProducerRecord}s.
@@ -162,7 +177,7 @@ public KafkaSinkBuilder<IN> setTransactionalIdPrefix(String transactionalIdPrefi
162177
checkState(
163178
transactionalIdPrefix.getBytes(StandardCharsets.UTF_8).length
164179
<= MAXIMUM_PREFIX_BYTES,
165-
"The configured prefix is too long and the resulting transactionalId might exceed Kafka's transactionalIds size.");
180+
"The configured prefix is too long and the resulting transactionalIdPrefix might exceed Kafka's transactionalIdPrefix size.");
166181
return this;
167182
}
168183

@@ -180,12 +195,12 @@ private void sanityCheck() {
180195
checkNotNull(
181196
kafkaProducerConfig.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
182197
"bootstrapServers");
198+
checkNotNull(recordSerializer, "recordSerializer");
183199
if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
184200
checkState(
185201
transactionalIdPrefix != null,
186-
"EXACTLY_ONCE delivery guarantee requires a transactionIdPrefix to be set to provide unique transaction names across multiple KafkaSinks writing to the same Kafka cluster.");
202+
"EXACTLY_ONCE delivery guarantee requires a transactionalIdPrefix to be set to provide unique transaction names across multiple KafkaSinks writing to the same Kafka cluster.");
187203
}
188-
checkNotNull(recordSerializer, "recordSerializer");
189204
}
190205

191206
/**
@@ -196,6 +211,10 @@ private void sanityCheck() {
196211
public KafkaSink<IN> build() {
197212
sanityCheck();
198213
return new KafkaSink<>(
199-
deliveryGuarantee, kafkaProducerConfig, transactionalIdPrefix, recordSerializer);
214+
deliveryGuarantee,
215+
kafkaProducerConfig,
216+
transactionalIdPrefix,
217+
recordSerializer,
218+
transactionNamingStrategy);
200219
}
201220
}

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionAborter.java

Lines changed: 0 additions & 128 deletions
This file was deleted.

0 commit comments

Comments
 (0)