Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -596,14 +596,17 @@ private TransactionManager configureTransactionState(ProducerConfig config,

if (config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)) {
final String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
final boolean enable2PC = config.getBoolean(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG);
final int transactionTimeoutMs = config.getInt(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
final long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);

transactionManager = new TransactionManager(
logContext,
transactionalId,
transactionTimeoutMs,
retryBackoffMs,
apiVersions
apiVersions,
enable2PC
);

if (transactionManager.isTransactional())
Expand Down Expand Up @@ -644,10 +647,43 @@ private TransactionManager configureTransactionState(ProducerConfig config,
* @throws InterruptException if the thread is interrupted while blocked
*/
public void initTransactions() {
initTransactions(false);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Magic Boolean Parameter

Boolean literal parameter reduces code readability and makes the intent unclear at call sites. The 'false' value requires developers to check method documentation to understand behavior. Consider using an enum or named constants to make the intent explicit and improve self-documenting code.

Standards
  • Clean-Code-Functions
  • Maintainability-Quality-Readability
  • Clean-Code-Meaningful-Names

}

/**
* Initialize the transactional state for this producer, similar to {@link #initTransactions()} but
* with additional capabilities to keep a previously prepared transaction.
* Must be called before any send operations that require a {@code transactionalId}.
* <p>
* Unlike the standard {@link #initTransactions()}, when {@code keepPreparedTxn} is set to
* {@code true}, the producer does <em>not</em> automatically abort existing transactions.
* Instead, it enters a recovery mode allowing only finalization of those previously prepared transactions.
*
* This behavior is especially crucial for 2PC scenarios, where transactions should remain intact
* until the external transaction manager decides whether to commit or abort.
* <p>
* When {@code keepPreparedTxn} is {@code false}, this behaves like the normal transactional
* initialization, aborting any unfinished transactions and resetting the producer for
* new writes.
*
* @param keepPreparedTxn true to retain any in-flight prepared transactions (necessary for 2PC
* recovery), false to abort existing transactions and behave like
* the standard initTransactions
*
* @throws IllegalStateException if no {@code transactional.id} is configured
* @throws org.apache.kafka.common.errors.UnsupportedVersionException if the broker does not
* support transactions (i.e. if its version is lower than 0.11.0.0)
* @throws org.apache.kafka.common.errors.TransactionalIdAuthorizationException if the configured
* {@code transactional.id} is unauthorized either for normal transaction writes or 2PC.
* @throws KafkaException if the producer encounters a fatal error or any other unexpected error
* @throws TimeoutException if the time taken for initialize the transaction has surpassed <code>max.block.ms</code>.
* @throws InterruptException if the thread is interrupted while blocked
*/
public void initTransactions(boolean keepPreparedTxn) {
throwIfNoTransactionManager();
throwIfProducerClosed();
long now = time.nanoseconds();
TransactionalRequestResult result = transactionManager.initializeTransactions();
TransactionalRequestResult result = transactionManager.initializeTransactions(keepPreparedTxn);
sender.wakeup();
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
producerMetrics.recordInit(time.nanoseconds() - now);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public MockProducer() {
}

@Override
public void initTransactions() {
public void initTransactions(boolean keepPreparedTxn) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Default Implementation

MockProducer implements parameterized initTransactions but removes default no-argument version. Interface provides default delegation to initTransactions(false), but MockProducer override breaks this delegation pattern. This creates API inconsistency where MockProducer behavior differs from KafkaProducer for default initTransactions() calls.

    @Override
    public void initTransactions() {
        initTransactions(false);
    }

    @Override
    public void initTransactions(boolean keepPreparedTxn) {
Commitable Suggestion
Suggested change
public void initTransactions(boolean keepPreparedTxn) {
@Override
public void initTransactions() {
initTransactions(false);
}
@Override
public void initTransactions(boolean keepPreparedTxn) {
Standards
  • Algorithm-Correctness-Interface-Implementation
  • Business-Rule-API-Consistency
  • Logic-Verification-Contract-Compliance

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Default Implementation

MockProducer overrides initTransactions(boolean) but removes default initTransactions() method implementation. Producer interface defines default initTransactions() calling initTransactions(false), but MockProducer signature change breaks this contract. This creates inconsistent behavior between real and mock implementations.

    @Override
    public void initTransactions() {
        initTransactions(false);
    }

    @Override
    public void initTransactions(boolean keepPreparedTxn) {
Commitable Suggestion
Suggested change
public void initTransactions(boolean keepPreparedTxn) {
@Override
public void initTransactions() {
initTransactions(false);
}
@Override
public void initTransactions(boolean keepPreparedTxn) {
Standards
  • Algorithm-Correctness-Interface-Contract
  • Business-Rule-API-Consistency

verifyNotClosed();
verifyNotFenced();
if (this.transactionInitialized) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,14 @@ public interface Producer<K, V> extends Closeable {
/**
* See {@link KafkaProducer#initTransactions()}
*/
void initTransactions();
default void initTransactions() {
initTransactions(false);
}
Comment on lines +45 to +47
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Verify that breaking change for external implementations is acceptable.

Converting initTransactions() from abstract to a default method that delegates to the new initTransactions(boolean) introduces a breaking change: any external implementations of the Producer interface must now implement the boolean variant. While call-site compatibility is preserved (existing callers can still use the no-arg version), implementers who previously only overrode initTransactions() will face compilation errors until they implement initTransactions(boolean keepPreparedTxn).

Run the following script to identify any external implementations that might be affected:


🏁 Script executed:

#!/bin/bash
# Description: Search for external Producer interface implementations to assess breaking change impact

# Search for classes implementing Producer interface
rg -n --type=java -A 5 -B 2 'class\s+\w+.*implements.*Producer<'

Length of output: 1948


Avoid breaking change to external Producer implementations
Adding a new abstract method initTransactions(boolean keepPreparedTxn) to the Producer interface will break any external implementers. Either make initTransactions(boolean) a default method (delegating to initTransactions()) or reserve this change for a major-version bump. Also expand the Javadoc to explain the semantics of the new keepPreparedTxn parameter.


/**
* See {@link KafkaProducer#initTransactions(boolean)}
*/
void initTransactions(boolean keepPreparedTxn);
Comment on lines +45 to +52
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: Revert the delegation direction to preserve compatibility: keep the existing no-arg initTransactions() signature as the abstract method and provide a default implementation for the new initTransactions(boolean) overload that delegates to the original no-arg contract. [possible issue]

Comment on lines +49 to +52
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Expand Javadoc to explain the keepPreparedTxn parameter.

The Javadoc merely references KafkaProducer#initTransactions(boolean) without explaining the semantics of keepPreparedTxn. API consumers need to understand when to pass true vs false and how it affects transaction recovery, especially in two-phase commit scenarios.

Consider adding detailed documentation similar to this:

 /**
- * See {@link KafkaProducer#initTransactions(boolean)}
+ * Initializes transactions for this producer, optionally preserving prepared transactions
+ * during recovery.
+ *
+ * @param keepPreparedTxn If true, any prepared (not yet committed or aborted) transactions
+ *                        are preserved during initialization, enabling two-phase commit
+ *                        recovery flows. If false (default), prepared transactions are
+ *                        aborted during initialization.
+ *
+ * @see KafkaProducer#initTransactions(boolean)
  */
 void initTransactions(boolean keepPreparedTxn);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/**
* See {@link KafkaProducer#initTransactions(boolean)}
*/
void initTransactions(boolean keepPreparedTxn);
/**
* Initializes transactions for this producer, optionally preserving prepared transactions
* during recovery.
*
* @param keepPreparedTxn If true, any prepared (not yet committed or aborted) transactions
* are preserved during initialization, enabling two-phase commit
* recovery flows. If false (default), prepared transactions are
* aborted during initialization.
*
* @see KafkaProducer#initTransactions(boolean)
*/
void initTransactions(boolean keepPreparedTxn);
🤖 Prompt for AI Agents
In clients/src/main/java/org/apache/kafka/clients/producer/Producer.java around
lines 49 to 52, the Javadoc for initTransactions(boolean) only references
KafkaProducer#initTransactions(boolean) and lacks an explanation of the
keepPreparedTxn parameter; update the Javadoc to describe the semantics of
keepPreparedTxn (what true vs false means), when callers should pass true or
false (e.g., preserving prepared but uncommitted transactions for two-phase
commit/recovery vs discarding them), how it affects transaction recovery and
visibility on broker restart, and any safety/ordering or client-state
implications so API consumers can choose correctly.


/**
* See {@link KafkaProducer#beginTransaction()}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,11 @@ public class ProducerConfig extends AbstractConfig {
"By default the TransactionId is not configured, which means transactions cannot be used. " +
"Note that, by default, transactions require a cluster of at least three brokers which is the recommended setting for production; for development you can change this, by adjusting broker setting <code>transaction.state.log.replication.factor</code>.";

/** <code> transaction.two.phase.commit.enable </code> */
public static final String TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG = "transaction.two.phase.commit.enable";
private static final String TRANSACTION_TWO_PHASE_COMMIT_ENABLE_DOC = "If set to true, then the broker is informed that the client is participating in " +
"two phase commit protocol and transactions that this client starts never expire.";

/**
* <code>security.providers</code>
*/
Expand Down Expand Up @@ -526,6 +531,11 @@ public class ProducerConfig extends AbstractConfig {
new ConfigDef.NonEmptyString(),
Importance.LOW,
TRANSACTIONAL_ID_DOC)
.define(TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG,
Type.BOOLEAN,
false,
Importance.LOW,
TRANSACTION_TWO_PHASE_COMMIT_ENABLE_DOC)
Comment on lines +534 to +538
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hardcoded Configuration Values

Configuration value hardcoded as 'false' default violates organization guideline against hardcoding variables. This creates inflexibility in deployment environments where different defaults may be required. Consider using environment-based configuration or making the default configurable through build-time parameters.

Standards
  • Org-Guideline-Hardcoding of variables
  • CWE-547
  • OWASP-A05

Comment on lines +534 to +538
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hardcoded Configuration Values

Configuration parameter uses hardcoded default value 'false' instead of externalized configuration. This violates organization guideline against hardcoding variables and creates maintenance burden when default behavior needs modification. Consider using environment-based or profile-based configuration defaults to improve flexibility and maintainability.

Standards
  • CWE-547
  • OWASP-A05

.define(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG,
Type.STRING,
CommonClientConfigs.DEFAULT_METADATA_RECOVERY_STRATEGY,
Expand Down Expand Up @@ -609,6 +619,20 @@ private void postProcessAndValidateIdempotenceConfigs(final Map<String, Object>
if (!idempotenceEnabled && userConfiguredTransactions) {
throw new ConfigException("Cannot set a " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence.");
}

// Validate that transaction.timeout.ms is not set when transaction.two.phase.commit.enable is true
// In standard Kafka transactions, the broker enforces transaction.timeout.ms and aborts any
// transaction that isn't completed in time. With two-phase commit (2PC), an external coordinator
// decides when to finalize, so broker-side timeouts don't apply. Disallow using both.
boolean enable2PC = this.getBoolean(TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG);
boolean userConfiguredTransactionTimeout = originalConfigs.containsKey(TRANSACTION_TIMEOUT_CONFIG);
if (enable2PC && userConfiguredTransactionTimeout) {
throw new ConfigException(
"Cannot set " + ProducerConfig.TRANSACTION_TIMEOUT_CONFIG +
" when " + ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG +
" is set to true. Transactions will not expire with two-phase commit enabled."
);
}
Comment on lines +622 to +635
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Add validation that 2PC requires transactional.id.

The validation correctly prevents using transaction.timeout.ms with 2PC, but doesn't verify that 2PC requires transactional.id to be set. Two-phase commit is meaningless without transactions, so enabling 2PC without a transactional ID should fail during configuration validation rather than at runtime.

Add this validation after line 621:

         if (!idempotenceEnabled && userConfiguredTransactions) {
             throw new ConfigException("Cannot set a " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence.");
         }
+
+        // Validate that transaction.two.phase.commit.enable requires transactional.id
+        boolean enable2PC = this.getBoolean(TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG);
+        if (enable2PC && !userConfiguredTransactions) {
+            throw new ConfigException(
+                "Cannot enable " + ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG +
+                " without setting a " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + ". " +
+                "Two-phase commit requires transactional producers."
+            );
+        }

Then move the existing 2PC validation to check userConfiguredTransactionTimeout:

-        // Validate that transaction.timeout.ms is not set when transaction.two.phase.commit.enable is true
-        // In standard Kafka transactions, the broker enforces transaction.timeout.ms and aborts any
-        // transaction that isn't completed in time. With two-phase commit (2PC), an external coordinator
-        // decides when to finalize, so broker-side timeouts don't apply. Disallow using both.
-        boolean enable2PC = this.getBoolean(TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG);
-        boolean userConfiguredTransactionTimeout = originalConfigs.containsKey(TRANSACTION_TIMEOUT_CONFIG);
-        if (enable2PC && userConfiguredTransactionTimeout) {
+        // Validate that transaction.timeout.ms is not explicitly set when 2PC is enabled
+        if (enable2PC && originalConfigs.containsKey(TRANSACTION_TIMEOUT_CONFIG)) {
             throw new ConfigException(
                 "Cannot set " + ProducerConfig.TRANSACTION_TIMEOUT_CONFIG +
                 " when " + ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG +
                 " is set to true. Transactions will not expire with two-phase commit enabled."
             );
         }

Clarify behavior of default transaction.timeout.ms with 2PC enabled.

The validation only prevents explicitly configured transaction.timeout.ms when 2PC is enabled, but transaction.timeout.ms has a default value (60000ms, line 525). Please verify and document whether:

  1. The default timeout is ignored when 2PC is enabled (handled in runtime code), or
  2. The timeout applies even with 2PC (which would contradict the "never expire" semantics)

If the default timeout should be ignored with 2PC, consider adding a comment explaining this behavior to prevent future confusion.


🏁 Script executed:

#!/bin/bash
set -e
# Extract lines around idempotence and transactions validation
sed -n '600,650p' clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java

# Extract lines around 2PC validation
sed -n '650,700p' clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java

# Locate userConfiguredTransactions definition
grep -R -n "userConfiguredTransactions" clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java

# Search for any existing check requiring transactional.id when 2PC is enabled
grep -R -n "TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG" clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java

Length of output: 4804


Validate that transaction.two.phase.commit.enable requires transactional.id.

  • Add a config-time check after the idempotence/transactional.id validation to throw a ConfigException when transaction.two.phase.commit.enable=true and no transactional.id is set. Two-phase commit without a transactional ID is invalid.

  • Clarify in comments whether the default transaction.timeout.ms is ignored when 2PC is enabled or still enforced at runtime to match “never expire” semantics.

}

private static String parseAcks(String acksString) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ public class TransactionManager {
*
* <ul>
* <li>{@link Producer#initTransactions()} calls {@link #initializeTransactions()}</li>
* <li>{@link Producer#initTransactions(boolean)} calls {@link #initializeTransactions(boolean)}</li>
* <li>{@link Producer#beginTransaction()} calls {@link #beginTransaction()}</li>
* <li>{@link Producer#commitTransaction()}} calls {@link #beginCommit()}</li>
* <li>{@link Producer#abortTransaction()} calls {@link #beginAbort()}
Expand Down Expand Up @@ -195,6 +196,7 @@ public class TransactionManager {
private volatile boolean clientSideEpochBumpRequired = false;
private volatile long latestFinalizedFeaturesEpoch = -1;
private volatile boolean isTransactionV2Enabled = false;
private final boolean enable2PC;

private enum State {
UNINITIALIZED,
Expand Down Expand Up @@ -255,7 +257,8 @@ public TransactionManager(final LogContext logContext,
final String transactionalId,
final int transactionTimeoutMs,
final long retryBackoffMs,
final ApiVersions apiVersions) {
final ApiVersions apiVersions,
final boolean enable2PC) {
Comment on lines 257 to +261
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Constructor Validation

Constructor accepts enable2PC parameter without validation against transactionTimeoutMs consistency. Configuration validation occurs in ProducerConfig but constructor allows inconsistent state creation. Runtime failures possible when enable2PC=true with non-zero transactionTimeoutMs creating conflicting timeout behaviors.

    public TransactionManager(final LogContext logContext,
                              final String transactionalId,
                              final int transactionTimeoutMs,
                              final long retryBackoffMs,
                              final ApiVersions apiVersions,
                              final boolean enable2PC) {
        if (enable2PC && transactionTimeoutMs > 0) {
            throw new IllegalArgumentException(
                "Cannot use transaction timeout when two-phase commit is enabled. " +
                "Transactions will not expire with two-phase commit enabled.");
        }
Commitable Suggestion
Suggested change
final String transactionalId,
final int transactionTimeoutMs,
final long retryBackoffMs,
final ApiVersions apiVersions) {
final ApiVersions apiVersions,
final boolean enable2PC) {
public TransactionManager(final LogContext logContext,
final String transactionalId,
final int transactionTimeoutMs,
final long retryBackoffMs,
final ApiVersions apiVersions,
final boolean enable2PC) {
if (enable2PC && transactionTimeoutMs > 0) {
throw new IllegalArgumentException(
"Cannot use transaction timeout when two-phase commit is enabled. " +
"Transactions will not expire with two-phase commit enabled.");
}
Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • DbC-Preconditions

Comment on lines 257 to +261
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Constructor Parameter Explosion

Constructor now has 6 parameters violating Clean Code parameter limit guidelines. Adding enable2PC parameter increases coupling and makes instantiation complex. This pattern will continue growing as more transaction features are added, reducing maintainability and testability.

Standards
  • Clean-Code-Functions
  • SOLID-SRP

this.producerIdAndEpoch = ProducerIdAndEpoch.NONE;
this.transactionalId = transactionalId;
this.log = logContext.logger(TransactionManager.class);
Expand All @@ -273,17 +276,29 @@ public TransactionManager(final LogContext logContext,
this.retryBackoffMs = retryBackoffMs;
this.txnPartitionMap = new TxnPartitionMap(logContext);
this.apiVersions = apiVersions;
this.enable2PC = enable2PC;
}

void setPoisonStateOnInvalidTransition(boolean shouldPoisonState) {
shouldPoisonStateOnInvalidTransition.set(shouldPoisonState);
}

public synchronized TransactionalRequestResult initializeTransactions() {
return initializeTransactions(ProducerIdAndEpoch.NONE);
return initializeTransactions(ProducerIdAndEpoch.NONE, false);
}

synchronized TransactionalRequestResult initializeTransactions(ProducerIdAndEpoch producerIdAndEpoch) {
return initializeTransactions(producerIdAndEpoch, false);
}

public synchronized TransactionalRequestResult initializeTransactions(boolean keepPreparedTxn) {
return initializeTransactions(ProducerIdAndEpoch.NONE, keepPreparedTxn);
Comment on lines 286 to +295
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method Overloading Chain

Four overloaded methods create a complex delegation chain with boolean parameters. This telescoping constructor anti-pattern makes the API confusing and error-prone. The multiple overloads with different parameter combinations reduce code clarity and increase maintenance burden.

Standards
  • Clean-Code-Functions
  • Design-Pattern-Builder

}

synchronized TransactionalRequestResult initializeTransactions(
ProducerIdAndEpoch producerIdAndEpoch,
boolean keepPreparedTxn
) {
Comment on lines 286 to +301
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method Overload Chain

Multiple overloaded methods create a complex delegation chain that reduces readability and increases maintenance burden. The four-method overload pattern makes it difficult to understand the primary implementation. Consider consolidating into fewer methods with optional parameters or using a configuration object.

Standards
  • Clean-Code-Functions
  • Maintainability-Quality-Complexity
  • SOLID-SRP

maybeFailWithError();

boolean isEpochBump = producerIdAndEpoch != ProducerIdAndEpoch.NONE;
Expand All @@ -292,14 +307,20 @@ synchronized TransactionalRequestResult initializeTransactions(ProducerIdAndEpoc
if (!isEpochBump) {
transitionTo(State.INITIALIZING);
log.info("Invoking InitProducerId for the first time in order to acquire a producer ID");
if (keepPreparedTxn) {
log.info("Invoking InitProducerId with keepPreparedTxn set to true for 2PC transactions");
}
} else {
log.info("Invoking InitProducerId with current producer ID and epoch {} in order to bump the epoch", producerIdAndEpoch);
}
InitProducerIdRequestData requestData = new InitProducerIdRequestData()
.setTransactionalId(transactionalId)
.setTransactionTimeoutMs(transactionTimeoutMs)
.setProducerId(producerIdAndEpoch.producerId)
.setProducerEpoch(producerIdAndEpoch.epoch);
.setProducerEpoch(producerIdAndEpoch.epoch)
.setEnable2Pc(enable2PC)
.setKeepPreparedTxn(keepPreparedTxn);

InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData),
isEpochBump);
enqueueRequest(handler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.apache.kafka.common.requests.EndTxnResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.MetadataResponse;
Expand Down Expand Up @@ -102,6 +103,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
Expand Down Expand Up @@ -1289,7 +1291,7 @@ public void testInitTransactionsResponseAfterTimeout() throws Exception {
((FindCoordinatorRequest) request).data().keyType() == FindCoordinatorRequest.CoordinatorType.TRANSACTION.id(),
FindCoordinatorResponse.prepareResponse(Errors.NONE, "bad-transaction", NODE));

Future<?> future = executor.submit(producer::initTransactions);
Future<?> future = executor.submit(() -> producer.initTransactions());
TestUtils.waitForCondition(client::hasInFlightRequests,
"Timed out while waiting for expected `InitProducerId` request to be sent");

Expand Down Expand Up @@ -1364,6 +1366,59 @@ public void testInitTransactionWhileThrottled() {
}
}

@ParameterizedTest
@CsvSource({
"true, false",
"true, true",
"false, true"
})
Comment on lines +1370 to +1374

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To enhance test completeness, consider adding the "false, false" case to your @CsvSource. While this represents the standard transaction initialization and is likely covered in other tests, its inclusion would make this parameterized test exhaustive by explicitly covering all four boolean combinations of keepPreparedTxn and enable2PC.

    @CsvSource({
        "true, false",
        "true, true",
        "false, true",
        "false, false"
    })

public void testInitTransactionsWithKeepPreparedTxnAndTwoPhaseCommit(boolean keepPreparedTxn, boolean enable2PC) {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-txn-id");
configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
if (enable2PC) {
configs.put(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG, true);
}

Time time = new MockTime(1);
MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
ProducerMetadata metadata = newMetadata(0, 0, Long.MAX_VALUE);
MockClient client = new MockClient(time, metadata);
client.updateMetadata(initialUpdateResponse);

// Capture flags from the InitProducerIdRequest
boolean[] requestFlags = new boolean[2]; // [keepPreparedTxn, enable2Pc]

client.prepareResponse(
request -> request instanceof FindCoordinatorRequest &&
((FindCoordinatorRequest) request).data().keyType() == FindCoordinatorRequest.CoordinatorType.TRANSACTION.id(),
FindCoordinatorResponse.prepareResponse(Errors.NONE, "test-txn-id", NODE));

client.prepareResponse(
request -> {
if (request instanceof InitProducerIdRequest) {
InitProducerIdRequest initRequest = (InitProducerIdRequest) request;
requestFlags[0] = initRequest.data().keepPreparedTxn();
requestFlags[1] = initRequest.data().enable2Pc();
return true;
}
return false;
},
initProducerIdResponse(1L, (short) 5, Errors.NONE));

try (Producer<String, String> producer = kafkaProducer(configs, new StringSerializer(),
new StringSerializer(), metadata, client, null, time)) {
producer.initTransactions(keepPreparedTxn);

// Verify request flags match expected values
assertEquals(keepPreparedTxn, requestFlags[0],
"keepPreparedTxn flag should match input parameter");
assertEquals(enable2PC, requestFlags[1],
"enable2Pc flag should match producer configuration");
}
}

@Test
public void testClusterAuthorizationFailure() throws Exception {
int maxBlockMs = 500;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,27 @@ void testUpperboundCheckOfEnableIdempotence() {
configs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
assertDoesNotThrow(() -> new ProducerConfig(configs));
}

@Test
void testTwoPhaseCommitIncompatibleWithTransactionTimeout() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass);
configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-txn-id");
configs.put(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG, true);
configs.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000);

ConfigException ce = assertThrows(ConfigException.class, () -> new ProducerConfig(configs));
assertTrue(ce.getMessage().contains(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG));
assertTrue(ce.getMessage().contains(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG));

// Verify that setting one but not the other is valid
configs.remove(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
assertDoesNotThrow(() -> new ProducerConfig(configs));

configs.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000);
configs.put(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG, false);
assertDoesNotThrow(() -> new ProducerConfig(configs));
}
}
Loading
Loading