Skip to content

Commit 3652d2b

Browse files
committed
minor
1 parent 5af2d8c commit 3652d2b

File tree

3 files changed

+31
-28
lines changed

3 files changed

+31
-28
lines changed

clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.kafka.clients.consumer.ConsumerRecords;
2626
import org.apache.kafka.clients.consumer.KafkaConsumer;
2727
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
28+
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
2829
import org.apache.kafka.clients.producer.internals.BufferPool;
2930
import org.apache.kafka.clients.producer.internals.BuiltInPartitioner;
3031
import org.apache.kafka.clients.producer.internals.KafkaProducerMetrics;
@@ -659,34 +660,33 @@ public void initTransactions() {
659660
}
660661

661662
/**
662-
* Performs initialization of transactions functionality in this producer instance. This method bootstraps
663-
* the producer with a {@code producerId} and also resets the internal state of the producer following a previous
664-
* fatal error. Additionally, it allows setting the {@code keepPreparedTxn} flag which, if set to true, puts the producer
665-
* into a restricted state that only allows transaction completion operations.
666-
*
663+
* Initialize the transactional state for this producer, similar to {@link #initTransactions()} but
664+
* with additional handling for two-phase commit (2PC). Must be called before any send operations
665+
* that require a {@code transactionalId}.
667666
* <p>
668-
* When {@code keepPreparedTxn} is set to {@code true}, the producer will be able to complete in-flight prepared
669-
* transactions, but will only allow calling {@link #commitTransaction()}, {@link #abortTransaction()}, or
670-
* the to-be-added {@code completeTransaction()} methods. This is to support recovery of prepared transactions
671-
* after a producer restart.
672-
*
667+
* Unlike the standard {@link #initTransactions()}, when {@code keepPreparedTxn} is set to
668+
* {@code true}, the producer does <em>not</em> automatically abort existing transactions
669+
* in the “prepare” phase. Instead, it enters a recovery mode allowing only finalization
670+
* of those previously prepared transactions. This behavior is crucial for 2PC scenarios,
671+
* where transactions should remain intact until the external transaction manager decides
672+
* whether to commit or abort.
673673
* <p>
674-
* Note that this method should only be called once during the lifetime of a producer instance, and must be
675-
* called before any other methods which require a {@code transactionalId} to be specified.
674+
* When {@code keepPreparedTxn} is {@code false}, this behaves like the normal transactional
675+
* initialization, aborting any unfinished transactions and resetting the producer for
676+
* new writes.
676677
*
677-
* @param keepPreparedTxn whether to keep prepared transactions, restricting the producer to only support completion of
678-
* prepared transactions. When set to true, the producer will only allow transaction completion
679-
* operations after initialization.
678+
* @param keepPreparedTxn true to retain any in-flight prepared transactions (necessary for 2PC
679+
* recovery), false to abort existing transactions and behave like
680+
* the standard initTransactions
680681
*
681-
* @throws IllegalStateException if no {@code transactional.id} has been configured for the producer
682-
* @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating that the broker
683-
* does not support transactions (i.e. if its version is lower than 0.11.0.0). If this is encountered,
684-
* the producer cannot be used for transactional messaging.
685-
* @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
686-
* {@code transactional.id} is not authorized. If this is encountered, the producer cannot be used for
687-
* transactional messaging.
688-
* @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error
689-
* @see #initTransactions()
682+
* @throws IllegalStateException if no {@code transactional.id} is configured
683+
* @throws org.apache.kafka.common.errors.UnsupportedVersionException if the broker does not
684+
* support transactions (broker version < 0.11.0.0)
685+
* @throws org.apache.kafka.common.errors.TransactionalIdAuthorizationException if the configured
686+
* {@code transactional.id} is unauthorized either for normal transaction writes or 2PC.
687+
* @throws KafkaException if the producer encounters a fatal error or any other unexpected error
688+
* @throws TimeoutException if the time taken for initialize the transaction has surpassed <code>max.block.ms</code>.
689+
* @throws InterruptException if the thread is interrupted while blocked
690690
*/
691691
public void initTransactions(boolean keepPreparedTxn) {
692692
throwIfNoTransactionManager();
@@ -746,7 +746,7 @@ public void beginTransaction() throws ProducerFencedException {
746746
* <p>
747747
* Note, that the consumer should have {@code enable.auto.commit=false} and should
748748
* also not commit offsets manually (via {@link KafkaConsumer#commitSync(Map) sync} or
749-
* {@link KafkaConsumer#commitAsync()} (Map, OffsetCommitCallback) async} commits).
749+
* {@link KafkaConsumer#commitAsync(Map, OffsetCommitCallback) async} commits).
750750
* This method will raise {@link TimeoutException} if the producer cannot send offsets before expiration of {@code max.block.ms}.
751751
* Additionally, it will raise {@link InterruptException} if interrupted.
752752
*

clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,7 @@ public class ProducerConfig extends AbstractConfig {
358358
/** <code> transaction.two.phase.commit.enable </code> */
359359
public static final String TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG = "transaction.two.phase.commit.enable";
360360
private static final String TRANSACTION_TWO_PHASE_COMMIT_ENABLE_DOC = "If set to true, then the broker is informed that the client is participating in " +
361-
"two phase commit protocol and transactions that this client starts never expire.";
361+
"two phase commit protocol and transactions that this client starts never expire.";
362362

363363
/**
364364
* <code>security.providers</code>
@@ -620,7 +620,10 @@ private void postProcessAndValidateIdempotenceConfigs(final Map<String, Object>
620620
throw new ConfigException("Cannot set a " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence.");
621621
}
622622

623-
// validate that transaction.timeout.ms is not set when transaction.two.phase.commit.enable is true
623+
// Validate that transaction.timeout.ms is not set when transaction.two.phase.commit.enable is true
624+
// In standard Kafka transactions, the broker enforces transaction.timeout.ms and aborts any
625+
// transaction that isn't completed in time. With two-phase commit (2PC), an external coordinator
626+
// decides when to finalize, so broker-side timeouts don't apply. Disallow using both.
624627
boolean enable2PC = this.getBoolean(TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG);
625628
boolean userConfiguredTransactionTimeout = originalConfigs.containsKey(TRANSACTION_TIMEOUT_CONFIG);
626629
if (enable2PC && userConfiguredTransactionTimeout) {

clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ synchronized TransactionalRequestResult initializeTransactions(
319319
.setKeepPreparedTxn(keepPreparedTxn);
320320

321321
InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData),
322-
isEpochBump);
322+
isEpochBump);
323323
enqueueRequest(handler);
324324
return handler.result;
325325
}, State.INITIALIZING, "initTransactions");

0 commit comments

Comments
 (0)