Skip to content

Commit 5af2d8c

Browse files
committed
Add enable2pc to txn manager and add overloaded method initProducerId(bool)
1 parent 7c6e0cf commit 5af2d8c

File tree

7 files changed

+111
-28
lines changed

7 files changed

+111
-28
lines changed

clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.kafka.clients.consumer.ConsumerRecords;
2626
import org.apache.kafka.clients.consumer.KafkaConsumer;
2727
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
28-
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
2928
import org.apache.kafka.clients.producer.internals.BufferPool;
3029
import org.apache.kafka.clients.producer.internals.BuiltInPartitioner;
3130
import org.apache.kafka.clients.producer.internals.KafkaProducerMetrics;
@@ -598,14 +597,17 @@ private TransactionManager configureTransactionState(ProducerConfig config,
598597

599598
if (config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)) {
600599
final String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
600+
final boolean enable2PC = config.getBoolean(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG);
601601
final int transactionTimeoutMs = config.getInt(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
602602
final long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
603+
603604
transactionManager = new TransactionManager(
604605
logContext,
605606
transactionalId,
606607
transactionTimeoutMs,
607608
retryBackoffMs,
608-
apiVersions
609+
apiVersions,
610+
enable2PC
609611
);
610612

611613
if (transactionManager.isTransactional())
@@ -656,6 +658,47 @@ public void initTransactions() {
656658
transactionManager.maybeUpdateTransactionV2Enabled(true);
657659
}
658660

661+
/**
662+
* Performs initialization of transactions functionality in this producer instance. This method bootstraps
663+
* the producer with a {@code producerId} and also resets the internal state of the producer following a previous
664+
* fatal error. Additionally, it allows setting the {@code keepPreparedTxn} flag which, if set to true, puts the producer
665+
* into a restricted state that only allows transaction completion operations.
666+
*
667+
* <p>
668+
* When {@code keepPreparedTxn} is set to {@code true}, the producer will be able to complete in-flight prepared
669+
* transactions, but will only allow calling {@link #commitTransaction()}, {@link #abortTransaction()}, or
670+
* the to-be-added {@code completeTransaction()} methods. This is to support recovery of prepared transactions
671+
* after a producer restart.
672+
*
673+
* <p>
674+
* Note that this method should only be called once during the lifetime of a producer instance, and must be
675+
* called before any other methods which require a {@code transactionalId} to be specified.
676+
*
677+
* @param keepPreparedTxn whether to keep prepared transactions, restricting the producer to only support completion of
678+
* prepared transactions. When set to true, the producer will only allow transaction completion
679+
* operations after initialization.
680+
*
681+
* @throws IllegalStateException if no {@code transactional.id} has been configured for the producer
682+
* @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating that the broker
683+
* does not support transactions (i.e. if its version is lower than 0.11.0.0). If this is encountered,
684+
* the producer cannot be used for transactional messaging.
685+
* @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
686+
* {@code transactional.id} is not authorized. If this is encountered, the producer cannot be used for
687+
* transactional messaging.
688+
* @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error
689+
* @see #initTransactions()
690+
*/
691+
public void initTransactions(boolean keepPreparedTxn) {
692+
throwIfNoTransactionManager();
693+
throwIfProducerClosed();
694+
long now = time.nanoseconds();
695+
TransactionalRequestResult result = transactionManager.initializeTransactions(keepPreparedTxn);
696+
sender.wakeup();
697+
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
698+
producerMetrics.recordInit(time.nanoseconds() - now);
699+
transactionManager.maybeUpdateTransactionV2Enabled(true);
700+
}
701+
659702
/**
660703
* Should be called before the start of each new transaction. Note that prior to the first invocation
661704
* of this method, you must invoke {@link #initTransactions()} exactly one time.
@@ -703,7 +746,7 @@ public void beginTransaction() throws ProducerFencedException {
703746
* <p>
704747
* Note, that the consumer should have {@code enable.auto.commit=false} and should
705748
* also not commit offsets manually (via {@link KafkaConsumer#commitSync(Map) sync} or
706-
* {@link KafkaConsumer#commitAsync(Map, OffsetCommitCallback) async} commits).
749+
* {@link KafkaConsumer#commitAsync()} (Map, OffsetCommitCallback) async} commits).
707750
* This method will raise {@link TimeoutException} if the producer cannot send offsets before expiration of {@code max.block.ms}.
708751
* Additionally, it will raise {@link InterruptException} if interrupted.
709752
*

clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,23 @@ public void initTransactions() {
158158
this.sentOffsets = false;
159159
}
160160

161+
@Override
162+
public void initTransactions(boolean keepPreparedTxn) {
163+
verifyNotClosed();
164+
verifyNotFenced();
165+
if (this.transactionInitialized) {
166+
throw new IllegalStateException("MockProducer has already been initialized for transactions.");
167+
}
168+
if (this.initTransactionException != null) {
169+
throw this.initTransactionException;
170+
}
171+
this.transactionInitialized = true;
172+
this.transactionInFlight = false;
173+
this.transactionCommitted = false;
174+
this.transactionAborted = false;
175+
this.sentOffsets = false;
176+
}
177+
161178
@Override
162179
public void beginTransaction() throws ProducerFencedException {
163180
verifyNotClosed();

clients/src/main/java/org/apache/kafka/clients/producer/Producer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,11 @@ public interface Producer<K, V> extends Closeable {
4444
*/
4545
void initTransactions();
4646

47+
/**
48+
* See {@link KafkaProducer#initTransactions(boolean)}
49+
*/
50+
void initTransactions(boolean keepPreparedTxn);
51+
4752
/**
4853
* See {@link KafkaProducer#beginTransaction()}
4954
*/

clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ public class TransactionManager {
138138
*
139139
* <ul>
140140
* <li>{@link Producer#initTransactions()} calls {@link #initializeTransactions()}</li>
141+
* <li>{@link Producer#initTransactions(boolean)} calls {@link #initializeTransactions(boolean)}</li>
141142
* <li>{@link Producer#beginTransaction()} calls {@link #beginTransaction()}</li>
142143
* <li>{@link Producer#commitTransaction()}} calls {@link #beginCommit()}</li>
143144
* <li>{@link Producer#abortTransaction()} calls {@link #beginAbort()}
@@ -195,6 +196,7 @@ public class TransactionManager {
195196
private volatile boolean clientSideEpochBumpRequired = false;
196197
private volatile long latestFinalizedFeaturesEpoch = -1;
197198
private volatile boolean isTransactionV2Enabled = false;
199+
private final boolean enable2PC;
198200

199201
private enum State {
200202
UNINITIALIZED,
@@ -255,7 +257,8 @@ public TransactionManager(final LogContext logContext,
255257
final String transactionalId,
256258
final int transactionTimeoutMs,
257259
final long retryBackoffMs,
258-
final ApiVersions apiVersions) {
260+
final ApiVersions apiVersions,
261+
final boolean enable2PC) {
259262
this.producerIdAndEpoch = ProducerIdAndEpoch.NONE;
260263
this.transactionalId = transactionalId;
261264
this.log = logContext.logger(TransactionManager.class);
@@ -273,17 +276,29 @@ public TransactionManager(final LogContext logContext,
273276
this.retryBackoffMs = retryBackoffMs;
274277
this.txnPartitionMap = new TxnPartitionMap(logContext);
275278
this.apiVersions = apiVersions;
279+
this.enable2PC = enable2PC;
276280
}
277281

278282
void setPoisonStateOnInvalidTransition(boolean shouldPoisonState) {
279283
shouldPoisonStateOnInvalidTransition.set(shouldPoisonState);
280284
}
281285

282286
public synchronized TransactionalRequestResult initializeTransactions() {
283-
return initializeTransactions(ProducerIdAndEpoch.NONE);
287+
return initializeTransactions(ProducerIdAndEpoch.NONE, false);
284288
}
285289

286290
synchronized TransactionalRequestResult initializeTransactions(ProducerIdAndEpoch producerIdAndEpoch) {
291+
return initializeTransactions(producerIdAndEpoch, false);
292+
}
293+
294+
public synchronized TransactionalRequestResult initializeTransactions(boolean keepPreparedTxn) {
295+
return initializeTransactions(ProducerIdAndEpoch.NONE, keepPreparedTxn);
296+
}
297+
298+
synchronized TransactionalRequestResult initializeTransactions(
299+
ProducerIdAndEpoch producerIdAndEpoch,
300+
boolean keepPreparedTxn
301+
) {
287302
maybeFailWithError();
288303

289304
boolean isEpochBump = producerIdAndEpoch != ProducerIdAndEpoch.NONE;
@@ -299,9 +314,12 @@ synchronized TransactionalRequestResult initializeTransactions(ProducerIdAndEpoc
299314
.setTransactionalId(transactionalId)
300315
.setTransactionTimeoutMs(transactionTimeoutMs)
301316
.setProducerId(producerIdAndEpoch.producerId)
302-
.setProducerEpoch(producerIdAndEpoch.epoch);
317+
.setProducerEpoch(producerIdAndEpoch.epoch)
318+
.setEnable2Pc(enable2PC)
319+
.setKeepPreparedTxn(keepPreparedTxn);
320+
303321
InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData),
304-
isEpochBump);
322+
isEpochBump);
305323
enqueueRequest(handler);
306324
return handler.result;
307325
}, State.INITIALIZING, "initTransactions");

clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1289,7 +1289,7 @@ public void testInitTransactionsResponseAfterTimeout() throws Exception {
12891289
((FindCoordinatorRequest) request).data().keyType() == FindCoordinatorRequest.CoordinatorType.TRANSACTION.id(),
12901290
FindCoordinatorResponse.prepareResponse(Errors.NONE, "bad-transaction", NODE));
12911291

1292-
Future<?> future = executor.submit(producer::initTransactions);
1292+
Future<?> future = executor.submit(() -> producer.initTransactions());
12931293
TestUtils.waitForCondition(client::hasInFlightRequests,
12941294
"Timed out while waiting for expected `InitProducerId` request to be sent");
12951295

0 commit comments

Comments
 (0)