Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -596,14 +596,17 @@ private TransactionManager configureTransactionState(ProducerConfig config,

if (config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)) {
final String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
final boolean enable2PC = config.getBoolean(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG);
final int transactionTimeoutMs = config.getInt(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
final long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);

transactionManager = new TransactionManager(
logContext,
transactionalId,
transactionTimeoutMs,
retryBackoffMs,
apiVersions
apiVersions,
enable2PC
);

if (transactionManager.isTransactional())
Expand Down Expand Up @@ -644,10 +647,43 @@ private TransactionManager configureTransactionState(ProducerConfig config,
* @throws InterruptException if the thread is interrupted while blocked
*/
public void initTransactions() {
initTransactions(false);
}
Comment on lines 649 to +651
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Default Handling

The default implementation of initTransactions() hardcodes 'false' for keepPreparedTxn. This prevents configuration-based defaults and forces explicit method calls for 2PC functionality, potentially causing inconsistent transaction behavior.

Standards
  • ISO-IEC-25010-Reliability-Maturity
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • DbC-Interface-Contract


/**
* Initialize the transactional state for this producer, similar to {@link #initTransactions()} but
* with additional capabilities to keep a previously prepared transaction.
* Must be called before any send operations that require a {@code transactionalId}.
* <p>
* Unlike the standard {@link #initTransactions()}, when {@code keepPreparedTxn} is set to
* {@code true}, the producer does <em>not</em> automatically abort existing transactions.
* Instead, it enters a recovery mode allowing only finalization of those previously prepared transactions.
Comment on lines +653 to +660
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incomplete Documentation

Documentation doesn't clearly state that keepPreparedTxn=true requires enable2PC=true configuration. Missing this critical dependency information could lead to misuse of the API and transaction reliability issues when developers use keepPreparedTxn without enabling 2PC.

Standards
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • ISO-IEC-25010-Reliability-Maturity
  • DbC-Interface-Documentation
  • SRE-Error-Prevention

*
* This behavior is especially crucial for 2PC scenarios, where transactions should remain intact
* until the external transaction manager decides whether to commit or abort.
* <p>
* When {@code keepPreparedTxn} is {@code false}, this behaves like the normal transactional
* initialization, aborting any unfinished transactions and resetting the producer for
* new writes.
*
* @param keepPreparedTxn true to retain any in-flight prepared transactions (necessary for 2PC
* recovery), false to abort existing transactions and behave like
* the standard initTransactions
*
* @throws IllegalStateException if no {@code transactional.id} is configured
* @throws org.apache.kafka.common.errors.UnsupportedVersionException if the broker does not
* support transactions (i.e. if its version is lower than 0.11.0.0)
* @throws org.apache.kafka.common.errors.TransactionalIdAuthorizationException if the configured
* {@code transactional.id} is unauthorized either for normal transaction writes or 2PC.
* @throws KafkaException if the producer encounters a fatal error or any other unexpected error
* @throws TimeoutException if the time taken for initialize the transaction has surpassed <code>max.block.ms</code>.
* @throws InterruptException if the thread is interrupted while blocked
*/
public void initTransactions(boolean keepPreparedTxn) {
throwIfNoTransactionManager();
throwIfProducerClosed();
long now = time.nanoseconds();
TransactionalRequestResult result = transactionManager.initializeTransactions();
TransactionalRequestResult result = transactionManager.initializeTransactions(keepPreparedTxn);
Comment on lines +682 to +686
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Validation Check

The new initTransactions method lacks validation for 2PC compatibility when keepPreparedTxn=true. Without validation, using keepPreparedTxn without enable2PC could cause transaction recovery failures.

Suggested change
public void initTransactions(boolean keepPreparedTxn) {
throwIfNoTransactionManager();
throwIfProducerClosed();
long now = time.nanoseconds();
TransactionalRequestResult result = transactionManager.initializeTransactions();
TransactionalRequestResult result = transactionManager.initializeTransactions(keepPreparedTxn);
public void initTransactions(boolean keepPreparedTxn) {
throwIfNoTransactionManager();
throwIfProducerClosed();
if (keepPreparedTxn && !transactionManager.isEnable2PC()) {
throw new IllegalStateException("Cannot use keepPreparedTxn=true without enabling two-phase commit");
}
long now = time.nanoseconds();
TransactionalRequestResult result = transactionManager.initializeTransactions(keepPreparedTxn);
Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • DbC-Preconditions

Comment on lines +682 to +686
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Transaction Validation

The new initTransactions(boolean) method doesn't validate that keepPreparedTxn=true is only used with enable2PC=true. Using keepPreparedTxn without 2PC enabled could cause transaction state inconsistencies since the broker won't be configured to handle prepared transactions properly.

Suggested change
public void initTransactions(boolean keepPreparedTxn) {
throwIfNoTransactionManager();
throwIfProducerClosed();
long now = time.nanoseconds();
TransactionalRequestResult result = transactionManager.initializeTransactions();
TransactionalRequestResult result = transactionManager.initializeTransactions(keepPreparedTxn);
public void initTransactions(boolean keepPreparedTxn) {
throwIfNoTransactionManager();
throwIfProducerClosed();
// Validate that keepPreparedTxn is only used with enable2PC
if (keepPreparedTxn && !transactionManager.isEnable2PC()) {
throw new IllegalStateException("Cannot use keepPreparedTxn=true without enabling two-phase commit. " +
"Set " + ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG + "=true in the producer config.");
}
long now = time.nanoseconds();
TransactionalRequestResult result = transactionManager.initializeTransactions(keepPreparedTxn);
Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • DbC-Precondition-Validation
  • SRE-Error-Prevention

sender.wakeup();
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
producerMetrics.recordInit(time.nanoseconds() - now);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public MockProducer() {
}

@Override
public void initTransactions() {
public void initTransactions(boolean keepPreparedTxn) {
verifyNotClosed();
verifyNotFenced();
if (this.transactionInitialized) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,14 @@ public interface Producer<K, V> extends Closeable {
/**
* See {@link KafkaProducer#initTransactions()}
*/
void initTransactions();
default void initTransactions() {
initTransactions(false);
}

/**
* See {@link KafkaProducer#initTransactions(boolean)}
*/
void initTransactions(boolean keepPreparedTxn);

/**
* See {@link KafkaProducer#beginTransaction()}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,11 @@ public class ProducerConfig extends AbstractConfig {
"By default the TransactionId is not configured, which means transactions cannot be used. " +
"Note that, by default, transactions require a cluster of at least three brokers which is the recommended setting for production; for development you can change this, by adjusting broker setting <code>transaction.state.log.replication.factor</code>.";

/** <code> transaction.two.phase.commit.enable </code> */
public static final String TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG = "transaction.two.phase.commit.enable";
private static final String TRANSACTION_TWO_PHASE_COMMIT_ENABLE_DOC = "If set to true, then the broker is informed that the client is participating in " +
"two phase commit protocol and transactions that this client starts never expire.";

/**
* <code>security.providers</code>
*/
Expand Down Expand Up @@ -526,6 +531,11 @@ public class ProducerConfig extends AbstractConfig {
new ConfigDef.NonEmptyString(),
Importance.LOW,
TRANSACTIONAL_ID_DOC)
.define(TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG,
Type.BOOLEAN,
false,
Importance.LOW,
TRANSACTION_TWO_PHASE_COMMIT_ENABLE_DOC)
.define(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG,
Type.STRING,
CommonClientConfigs.DEFAULT_METADATA_RECOVERY_STRATEGY,
Expand Down Expand Up @@ -609,6 +619,20 @@ private void postProcessAndValidateIdempotenceConfigs(final Map<String, Object>
if (!idempotenceEnabled && userConfiguredTransactions) {
throw new ConfigException("Cannot set a " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence.");
}

// Validate that transaction.timeout.ms is not set when transaction.two.phase.commit.enable is true
// In standard Kafka transactions, the broker enforces transaction.timeout.ms and aborts any
// transaction that isn't completed in time. With two-phase commit (2PC), an external coordinator
// decides when to finalize, so broker-side timeouts don't apply. Disallow using both.
boolean enable2PC = this.getBoolean(TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG);
boolean userConfiguredTransactionTimeout = originalConfigs.containsKey(TRANSACTION_TIMEOUT_CONFIG);
if (enable2PC && userConfiguredTransactionTimeout) {
throw new ConfigException(
Comment on lines +628 to +630
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Configuration Validation Gap

Configuration validation only checks if transaction timeout is explicitly set, but doesn't validate when enable2PC=true with default transaction timeout. This creates inconsistent validation as the default timeout would still apply but isn't being rejected, potentially causing reliability issues in 2PC transactions.

Suggested change
boolean userConfiguredTransactionTimeout = originalConfigs.containsKey(TRANSACTION_TIMEOUT_CONFIG);
if (enable2PC && userConfiguredTransactionTimeout) {
throw new ConfigException(
boolean enable2PC = this.getBoolean(TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG);
if (enable2PC) {
// When 2PC is enabled, transaction timeout should not be used at all
// as transactions are managed by an external coordinator
if (originalConfigs.containsKey(TRANSACTION_TIMEOUT_CONFIG)) {
throw new ConfigException(
"Cannot set " + ProducerConfig.TRANSACTION_TIMEOUT_CONFIG +
" when " + ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG +
" is set to true. Transactions will not expire with two-phase commit enabled."
);
}
// Force transaction timeout to a very large value when 2PC is enabled
// to effectively disable timeout-based transaction expiration
this.values().put(TRANSACTION_TIMEOUT_CONFIG, Integer.MAX_VALUE);
}
Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • DbC-Precondition-Validation
  • SRE-Configuration-Consistency

"Cannot set " + ProducerConfig.TRANSACTION_TIMEOUT_CONFIG +
" when " + ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG +
" is set to true. Transactions will not expire with two-phase commit enabled."
);
}
Comment on lines +627 to +635
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Configuration Validation Gap

The validation prevents setting transaction.timeout.ms with 2PC enabled, but doesn't handle the default transaction timeout. The default timeout will still be used internally even when 2PC is enabled, potentially causing inconsistent behavior.

Suggested change
boolean enable2PC = this.getBoolean(TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG);
boolean userConfiguredTransactionTimeout = originalConfigs.containsKey(TRANSACTION_TIMEOUT_CONFIG);
if (enable2PC && userConfiguredTransactionTimeout) {
throw new ConfigException(
"Cannot set " + ProducerConfig.TRANSACTION_TIMEOUT_CONFIG +
" when " + ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG +
" is set to true. Transactions will not expire with two-phase commit enabled."
);
}
boolean enable2PC = this.getBoolean(TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG);
boolean userConfiguredTransactionTimeout = originalConfigs.containsKey(TRANSACTION_TIMEOUT_CONFIG);
if (enable2PC && userConfiguredTransactionTimeout) {
throw new ConfigException(
"Cannot set " + ProducerConfig.TRANSACTION_TIMEOUT_CONFIG +
" when " + ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG +
" is set to true. Transactions will not expire with two-phase commit enabled."
);
}
// Override default transaction timeout when 2PC is enabled to ensure consistency
if (enable2PC) {
this.values().put(TRANSACTION_TIMEOUT_CONFIG, Integer.MAX_VALUE);
}
Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • DbC-Precondition

}

private static String parseAcks(String acksString) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ public class TransactionManager {
*
* <ul>
* <li>{@link Producer#initTransactions()} calls {@link #initializeTransactions()}</li>
* <li>{@link Producer#initTransactions(boolean)} calls {@link #initializeTransactions(boolean)}</li>
* <li>{@link Producer#beginTransaction()} calls {@link #beginTransaction()}</li>
* <li>{@link Producer#commitTransaction()}} calls {@link #beginCommit()}</li>
* <li>{@link Producer#abortTransaction()} calls {@link #beginAbort()}
Expand Down Expand Up @@ -195,6 +196,7 @@ public class TransactionManager {
private volatile boolean clientSideEpochBumpRequired = false;
private volatile long latestFinalizedFeaturesEpoch = -1;
private volatile boolean isTransactionV2Enabled = false;
private final boolean enable2PC;

private enum State {
UNINITIALIZED,
Expand Down Expand Up @@ -255,7 +257,8 @@ public TransactionManager(final LogContext logContext,
final String transactionalId,
final int transactionTimeoutMs,
final long retryBackoffMs,
final ApiVersions apiVersions) {
final ApiVersions apiVersions,
final boolean enable2PC) {
Comment on lines +260 to +261
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Transaction Timeout Validation

The validation prevents setting transaction.timeout.ms when 2PC is enabled, but doesn't handle the case where the default transaction.timeout.ms is still used internally. The TransactionManager constructor still receives the transactionTimeoutMs parameter without checking if 2PC is enabled.

Suggested change
final ApiVersions apiVersions,
final boolean enable2PC) {
final ApiVersions apiVersions,
final boolean enable2PC) {
if (enable2PC && transactionTimeoutMs != Integer.MAX_VALUE) {
log.info("Using MAX_VALUE for transaction timeout with 2PC enabled instead of {}", transactionTimeoutMs);
}
Standards
  • ISO-IEC-25010-Reliability-Maturity
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • DbC-Precondition

this.producerIdAndEpoch = ProducerIdAndEpoch.NONE;
this.transactionalId = transactionalId;
this.log = logContext.logger(TransactionManager.class);
Expand All @@ -273,17 +276,29 @@ public TransactionManager(final LogContext logContext,
this.retryBackoffMs = retryBackoffMs;
this.txnPartitionMap = new TxnPartitionMap(logContext);
this.apiVersions = apiVersions;
this.enable2PC = enable2PC;
}

void setPoisonStateOnInvalidTransition(boolean shouldPoisonState) {
shouldPoisonStateOnInvalidTransition.set(shouldPoisonState);
}

public synchronized TransactionalRequestResult initializeTransactions() {
return initializeTransactions(ProducerIdAndEpoch.NONE);
return initializeTransactions(ProducerIdAndEpoch.NONE, false);
}

synchronized TransactionalRequestResult initializeTransactions(ProducerIdAndEpoch producerIdAndEpoch) {
return initializeTransactions(producerIdAndEpoch, false);
}

public synchronized TransactionalRequestResult initializeTransactions(boolean keepPreparedTxn) {
return initializeTransactions(ProducerIdAndEpoch.NONE, keepPreparedTxn);
}

Comment on lines 290 to +297
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Boolean Parameter Clarity

Boolean parameters reduce readability and create implicit dependencies. Consider using enums or constants for the keepPreparedTxn parameter to improve code clarity and maintainability.

Standards
  • Clean-Code-Boolean-Parameters
  • Clean-Code-Meaningful-Names

synchronized TransactionalRequestResult initializeTransactions(
ProducerIdAndEpoch producerIdAndEpoch,
boolean keepPreparedTxn
) {
maybeFailWithError();

boolean isEpochBump = producerIdAndEpoch != ProducerIdAndEpoch.NONE;
Expand All @@ -292,14 +307,20 @@ synchronized TransactionalRequestResult initializeTransactions(ProducerIdAndEpoc
if (!isEpochBump) {
transitionTo(State.INITIALIZING);
log.info("Invoking InitProducerId for the first time in order to acquire a producer ID");
if (keepPreparedTxn) {
log.info("Invoking InitProducerId with keepPreparedTxn set to true for 2PC transactions");
}
} else {
log.info("Invoking InitProducerId with current producer ID and epoch {} in order to bump the epoch", producerIdAndEpoch);
}
InitProducerIdRequestData requestData = new InitProducerIdRequestData()
.setTransactionalId(transactionalId)
.setTransactionTimeoutMs(transactionTimeoutMs)
.setProducerId(producerIdAndEpoch.producerId)
.setProducerEpoch(producerIdAndEpoch.epoch);
.setProducerEpoch(producerIdAndEpoch.epoch)
.setEnable2Pc(enable2PC)
.setKeepPreparedTxn(keepPreparedTxn);
Comment on lines 316 to +322
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InitProducerId Request Optimization

The InitProducerId request now includes two additional flags that increase request size. For high-throughput systems with frequent producer initialization, consider batching producer initializations or implementing a connection pooling strategy to amortize the cost of these larger requests.

Standards
  • ISO-IEC-25010-Performance-Resource-Utilization
  • Netflix-Hot-Path-Optimization


InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData),
isEpochBump);
enqueueRequest(handler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.apache.kafka.common.requests.EndTxnResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.MetadataResponse;
Expand Down Expand Up @@ -102,6 +103,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
Expand Down Expand Up @@ -1289,7 +1291,7 @@ public void testInitTransactionsResponseAfterTimeout() throws Exception {
((FindCoordinatorRequest) request).data().keyType() == FindCoordinatorRequest.CoordinatorType.TRANSACTION.id(),
FindCoordinatorResponse.prepareResponse(Errors.NONE, "bad-transaction", NODE));

Future<?> future = executor.submit(producer::initTransactions);
Future<?> future = executor.submit(() -> producer.initTransactions());
TestUtils.waitForCondition(client::hasInFlightRequests,
"Timed out while waiting for expected `InitProducerId` request to be sent");

Expand Down Expand Up @@ -1364,6 +1366,59 @@ public void testInitTransactionWhileThrottled() {
}
}

@ParameterizedTest
@CsvSource({
"true, false",
"true, true",
"false, true"
})
public void testInitTransactionsWithKeepPreparedTxnAndTwoPhaseCommit(boolean keepPreparedTxn, boolean enable2PC) {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-txn-id");
configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
if (enable2PC) {
configs.put(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG, true);
}

Time time = new MockTime(1);
MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
ProducerMetadata metadata = newMetadata(0, 0, Long.MAX_VALUE);
MockClient client = new MockClient(time, metadata);
client.updateMetadata(initialUpdateResponse);

// Capture flags from the InitProducerIdRequest
boolean[] requestFlags = new boolean[2]; // [keepPreparedTxn, enable2Pc]

client.prepareResponse(
request -> request instanceof FindCoordinatorRequest &&
((FindCoordinatorRequest) request).data().keyType() == FindCoordinatorRequest.CoordinatorType.TRANSACTION.id(),
FindCoordinatorResponse.prepareResponse(Errors.NONE, "test-txn-id", NODE));

client.prepareResponse(
request -> {
if (request instanceof InitProducerIdRequest) {
InitProducerIdRequest initRequest = (InitProducerIdRequest) request;
requestFlags[0] = initRequest.data().keepPreparedTxn();
Comment on lines +1400 to +1402
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Null Safety Check

The test code accesses initRequest.data() without null checking. If data() returns null, this would cause a NullPointerException, making tests unreliable and potentially masking real issues.

Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • SRE-Error-Handling

requestFlags[1] = initRequest.data().enable2Pc();
return true;
}
return false;
},
initProducerIdResponse(1L, (short) 5, Errors.NONE));

try (Producer<String, String> producer = kafkaProducer(configs, new StringSerializer(),
new StringSerializer(), metadata, client, null, time)) {
producer.initTransactions(keepPreparedTxn);

// Verify request flags match expected values
assertEquals(keepPreparedTxn, requestFlags[0],
"keepPreparedTxn flag should match input parameter");
assertEquals(enable2PC, requestFlags[1],
"enable2Pc flag should match producer configuration");
Comment on lines +1412 to +1418
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Null Safety Check

The test accesses requestFlags array without checking if the InitProducerIdRequest was actually processed. If the request handler wasn't called, requestFlags would contain default values (false), potentially causing false test failures.

Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • SRE-Test-Reliability

}
}

@Test
public void testClusterAuthorizationFailure() throws Exception {
int maxBlockMs = 500;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,27 @@ void testUpperboundCheckOfEnableIdempotence() {
configs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
assertDoesNotThrow(() -> new ProducerConfig(configs));
}

@Test
void testTwoPhaseCommitIncompatibleWithTransactionTimeout() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass);
configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-txn-id");
configs.put(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG, true);
configs.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000);

ConfigException ce = assertThrows(ConfigException.class, () -> new ProducerConfig(configs));
assertTrue(ce.getMessage().contains(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG));
assertTrue(ce.getMessage().contains(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG));

// Verify that setting one but not the other is valid
configs.remove(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
assertDoesNotThrow(() -> new ProducerConfig(configs));

configs.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000);
configs.put(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG, false);
assertDoesNotThrow(() -> new ProducerConfig(configs));
}
}
Loading
Loading