-
Notifications
You must be signed in to change notification settings - Fork 0
KAFKA-19082: [1/4] Add client config for enable2PC and overloaded initProducerId (KIP-939) #13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
7c6e0cf
5af2d8c
3652d2b
eda67cf
f99b31f
b1b0072
a17217c
5ccb444
4618b63
4a08713
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -596,14 +596,17 @@ private TransactionManager configureTransactionState(ProducerConfig config, | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| final String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| final boolean enable2PC = config.getBoolean(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| final int transactionTimeoutMs = config.getInt(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| final long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| transactionManager = new TransactionManager( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logContext, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| transactionalId, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| transactionTimeoutMs, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| retryBackoffMs, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| apiVersions | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| apiVersions, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| enable2PC | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (transactionManager.isTransactional()) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -644,10 +647,43 @@ private TransactionManager configureTransactionState(ProducerConfig config, | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * @throws InterruptException if the thread is interrupted while blocked | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| public void initTransactions() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| initTransactions(false); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * Initialize the transactional state for this producer, similar to {@link #initTransactions()} but | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * with additional capabilities to keep a previously prepared transaction. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * Must be called before any send operations that require a {@code transactionalId}. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * <p> | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * Unlike the standard {@link #initTransactions()}, when {@code keepPreparedTxn} is set to | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * {@code true}, the producer does <em>not</em> automatically abort existing transactions. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * Instead, it enters a recovery mode allowing only finalization of those previously prepared transactions. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+653
to
+660
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Incomplete DocumentationDocumentation doesn't clearly state that keepPreparedTxn=true requires enable2PC=true configuration. Missing this critical dependency information could lead to misuse of the API and transaction reliability issues when developers use keepPreparedTxn without enabling 2PC. Standards
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * This behavior is especially crucial for 2PC scenarios, where transactions should remain intact | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * until the external transaction manager decides whether to commit or abort. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * <p> | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * When {@code keepPreparedTxn} is {@code false}, this behaves like the normal transactional | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * initialization, aborting any unfinished transactions and resetting the producer for | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * new writes. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * @param keepPreparedTxn true to retain any in-flight prepared transactions (necessary for 2PC | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * recovery), false to abort existing transactions and behave like | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * the standard initTransactions | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * @throws IllegalStateException if no {@code transactional.id} is configured | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * @throws org.apache.kafka.common.errors.UnsupportedVersionException if the broker does not | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * support transactions (i.e. if its version is lower than 0.11.0.0) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * @throws org.apache.kafka.common.errors.TransactionalIdAuthorizationException if the configured | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * {@code transactional.id} is unauthorized either for normal transaction writes or 2PC. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * @throws KafkaException if the producer encounters a fatal error or any other unexpected error | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * @throws TimeoutException if the time taken for initialize the transaction has surpassed <code>max.block.ms</code>. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * @throws InterruptException if the thread is interrupted while blocked | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| public void initTransactions(boolean keepPreparedTxn) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| throwIfNoTransactionManager(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| throwIfProducerClosed(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| long now = time.nanoseconds(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| TransactionalRequestResult result = transactionManager.initializeTransactions(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| TransactionalRequestResult result = transactionManager.initializeTransactions(keepPreparedTxn); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+682
to
+686
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing Validation CheckThe new initTransactions method lacks validation for 2PC compatibility when keepPreparedTxn=true. Without validation, using keepPreparedTxn without enable2PC could cause transaction recovery failures.
Suggested change
Standards
Comment on lines
+682
to
+686
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing Transaction ValidationThe new initTransactions(boolean) method doesn't validate that keepPreparedTxn=true is only used with enable2PC=true. Using keepPreparedTxn without 2PC enabled could cause transaction state inconsistencies since the broker won't be configured to handle prepared transactions properly.
Suggested change
Standards
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| sender.wakeup(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| producerMetrics.recordInit(time.nanoseconds() - now); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -355,6 +355,11 @@ public class ProducerConfig extends AbstractConfig { | |||||||||||||||||||||||||||||||||||||||||||||
| "By default the TransactionId is not configured, which means transactions cannot be used. " + | ||||||||||||||||||||||||||||||||||||||||||||||
| "Note that, by default, transactions require a cluster of at least three brokers which is the recommended setting for production; for development you can change this, by adjusting broker setting <code>transaction.state.log.replication.factor</code>."; | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| /** <code> transaction.two.phase.commit.enable </code> */ | ||||||||||||||||||||||||||||||||||||||||||||||
| public static final String TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG = "transaction.two.phase.commit.enable"; | ||||||||||||||||||||||||||||||||||||||||||||||
| private static final String TRANSACTION_TWO_PHASE_COMMIT_ENABLE_DOC = "If set to true, then the broker is informed that the client is participating in " + | ||||||||||||||||||||||||||||||||||||||||||||||
| "two phase commit protocol and transactions that this client starts never expire."; | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||||||||||||||
| * <code>security.providers</code> | ||||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -526,6 +531,11 @@ public class ProducerConfig extends AbstractConfig { | |||||||||||||||||||||||||||||||||||||||||||||
| new ConfigDef.NonEmptyString(), | ||||||||||||||||||||||||||||||||||||||||||||||
| Importance.LOW, | ||||||||||||||||||||||||||||||||||||||||||||||
| TRANSACTIONAL_ID_DOC) | ||||||||||||||||||||||||||||||||||||||||||||||
| .define(TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG, | ||||||||||||||||||||||||||||||||||||||||||||||
| Type.BOOLEAN, | ||||||||||||||||||||||||||||||||||||||||||||||
| false, | ||||||||||||||||||||||||||||||||||||||||||||||
| Importance.LOW, | ||||||||||||||||||||||||||||||||||||||||||||||
| TRANSACTION_TWO_PHASE_COMMIT_ENABLE_DOC) | ||||||||||||||||||||||||||||||||||||||||||||||
| .define(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG, | ||||||||||||||||||||||||||||||||||||||||||||||
| Type.STRING, | ||||||||||||||||||||||||||||||||||||||||||||||
| CommonClientConfigs.DEFAULT_METADATA_RECOVERY_STRATEGY, | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -609,6 +619,20 @@ private void postProcessAndValidateIdempotenceConfigs(final Map<String, Object> | |||||||||||||||||||||||||||||||||||||||||||||
| if (!idempotenceEnabled && userConfiguredTransactions) { | ||||||||||||||||||||||||||||||||||||||||||||||
| throw new ConfigException("Cannot set a " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence."); | ||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| // Validate that transaction.timeout.ms is not set when transaction.two.phase.commit.enable is true | ||||||||||||||||||||||||||||||||||||||||||||||
| // In standard Kafka transactions, the broker enforces transaction.timeout.ms and aborts any | ||||||||||||||||||||||||||||||||||||||||||||||
| // transaction that isn't completed in time. With two-phase commit (2PC), an external coordinator | ||||||||||||||||||||||||||||||||||||||||||||||
| // decides when to finalize, so broker-side timeouts don't apply. Disallow using both. | ||||||||||||||||||||||||||||||||||||||||||||||
| boolean enable2PC = this.getBoolean(TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG); | ||||||||||||||||||||||||||||||||||||||||||||||
| boolean userConfiguredTransactionTimeout = originalConfigs.containsKey(TRANSACTION_TIMEOUT_CONFIG); | ||||||||||||||||||||||||||||||||||||||||||||||
| if (enable2PC && userConfiguredTransactionTimeout) { | ||||||||||||||||||||||||||||||||||||||||||||||
| throw new ConfigException( | ||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+628
to
+630
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Configuration Validation GapConfiguration validation only checks if transaction timeout is explicitly set, but doesn't validate when enable2PC=true with default transaction timeout. This creates inconsistent validation as the default timeout would still apply but isn't being rejected, potentially causing reliability issues in 2PC transactions.
Suggested change
Standards
|
||||||||||||||||||||||||||||||||||||||||||||||
| "Cannot set " + ProducerConfig.TRANSACTION_TIMEOUT_CONFIG + | ||||||||||||||||||||||||||||||||||||||||||||||
| " when " + ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG + | ||||||||||||||||||||||||||||||||||||||||||||||
| " is set to true. Transactions will not expire with two-phase commit enabled." | ||||||||||||||||||||||||||||||||||||||||||||||
| ); | ||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+627
to
+635
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Configuration Validation GapThe validation prevents setting transaction.timeout.ms with 2PC enabled, but doesn't handle the default transaction timeout. The default timeout will still be used internally even when 2PC is enabled, potentially causing inconsistent behavior.
Suggested change
Standards
|
||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| private static String parseAcks(String acksString) { | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change | ||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -138,6 +138,7 @@ public class TransactionManager { | |||||||||||||||
| * | ||||||||||||||||
| * <ul> | ||||||||||||||||
| * <li>{@link Producer#initTransactions()} calls {@link #initializeTransactions()}</li> | ||||||||||||||||
| * <li>{@link Producer#initTransactions(boolean)} calls {@link #initializeTransactions(boolean)}</li> | ||||||||||||||||
| * <li>{@link Producer#beginTransaction()} calls {@link #beginTransaction()}</li> | ||||||||||||||||
| * <li>{@link Producer#commitTransaction()}} calls {@link #beginCommit()}</li> | ||||||||||||||||
| * <li>{@link Producer#abortTransaction()} calls {@link #beginAbort()} | ||||||||||||||||
|
|
@@ -195,6 +196,7 @@ public class TransactionManager { | |||||||||||||||
| private volatile boolean clientSideEpochBumpRequired = false; | ||||||||||||||||
| private volatile long latestFinalizedFeaturesEpoch = -1; | ||||||||||||||||
| private volatile boolean isTransactionV2Enabled = false; | ||||||||||||||||
| private final boolean enable2PC; | ||||||||||||||||
|
|
||||||||||||||||
| private enum State { | ||||||||||||||||
| UNINITIALIZED, | ||||||||||||||||
|
|
@@ -255,7 +257,8 @@ public TransactionManager(final LogContext logContext, | |||||||||||||||
| final String transactionalId, | ||||||||||||||||
| final int transactionTimeoutMs, | ||||||||||||||||
| final long retryBackoffMs, | ||||||||||||||||
| final ApiVersions apiVersions) { | ||||||||||||||||
| final ApiVersions apiVersions, | ||||||||||||||||
| final boolean enable2PC) { | ||||||||||||||||
|
Comment on lines
+260
to
+261
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Transaction Timeout ValidationThe validation prevents setting transaction.timeout.ms when 2PC is enabled, but doesn't handle the case where the default transaction.timeout.ms is still used internally. The TransactionManager constructor still receives the transactionTimeoutMs parameter without checking if 2PC is enabled.
Suggested change
Standards
|
||||||||||||||||
| this.producerIdAndEpoch = ProducerIdAndEpoch.NONE; | ||||||||||||||||
| this.transactionalId = transactionalId; | ||||||||||||||||
| this.log = logContext.logger(TransactionManager.class); | ||||||||||||||||
|
|
@@ -273,17 +276,29 @@ public TransactionManager(final LogContext logContext, | |||||||||||||||
| this.retryBackoffMs = retryBackoffMs; | ||||||||||||||||
| this.txnPartitionMap = new TxnPartitionMap(logContext); | ||||||||||||||||
| this.apiVersions = apiVersions; | ||||||||||||||||
| this.enable2PC = enable2PC; | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| void setPoisonStateOnInvalidTransition(boolean shouldPoisonState) { | ||||||||||||||||
| shouldPoisonStateOnInvalidTransition.set(shouldPoisonState); | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| public synchronized TransactionalRequestResult initializeTransactions() { | ||||||||||||||||
| return initializeTransactions(ProducerIdAndEpoch.NONE); | ||||||||||||||||
| return initializeTransactions(ProducerIdAndEpoch.NONE, false); | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| synchronized TransactionalRequestResult initializeTransactions(ProducerIdAndEpoch producerIdAndEpoch) { | ||||||||||||||||
| return initializeTransactions(producerIdAndEpoch, false); | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| public synchronized TransactionalRequestResult initializeTransactions(boolean keepPreparedTxn) { | ||||||||||||||||
| return initializeTransactions(ProducerIdAndEpoch.NONE, keepPreparedTxn); | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
|
Comment on lines
290
to
+297
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Boolean Parameter ClarityBoolean parameters reduce readability and create implicit dependencies. Consider using enums or constants for the keepPreparedTxn parameter to improve code clarity and maintainability. Standards
|
||||||||||||||||
| synchronized TransactionalRequestResult initializeTransactions( | ||||||||||||||||
| ProducerIdAndEpoch producerIdAndEpoch, | ||||||||||||||||
| boolean keepPreparedTxn | ||||||||||||||||
| ) { | ||||||||||||||||
| maybeFailWithError(); | ||||||||||||||||
|
|
||||||||||||||||
| boolean isEpochBump = producerIdAndEpoch != ProducerIdAndEpoch.NONE; | ||||||||||||||||
|
|
@@ -292,14 +307,20 @@ synchronized TransactionalRequestResult initializeTransactions(ProducerIdAndEpoc | |||||||||||||||
| if (!isEpochBump) { | ||||||||||||||||
| transitionTo(State.INITIALIZING); | ||||||||||||||||
| log.info("Invoking InitProducerId for the first time in order to acquire a producer ID"); | ||||||||||||||||
| if (keepPreparedTxn) { | ||||||||||||||||
| log.info("Invoking InitProducerId with keepPreparedTxn set to true for 2PC transactions"); | ||||||||||||||||
| } | ||||||||||||||||
| } else { | ||||||||||||||||
| log.info("Invoking InitProducerId with current producer ID and epoch {} in order to bump the epoch", producerIdAndEpoch); | ||||||||||||||||
| } | ||||||||||||||||
| InitProducerIdRequestData requestData = new InitProducerIdRequestData() | ||||||||||||||||
| .setTransactionalId(transactionalId) | ||||||||||||||||
| .setTransactionTimeoutMs(transactionTimeoutMs) | ||||||||||||||||
| .setProducerId(producerIdAndEpoch.producerId) | ||||||||||||||||
| .setProducerEpoch(producerIdAndEpoch.epoch); | ||||||||||||||||
| .setProducerEpoch(producerIdAndEpoch.epoch) | ||||||||||||||||
| .setEnable2Pc(enable2PC) | ||||||||||||||||
| .setKeepPreparedTxn(keepPreparedTxn); | ||||||||||||||||
|
Comment on lines
316
to
+322
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. InitProducerId Request OptimizationThe InitProducerId request now includes two additional flags that increase request size. For high-throughput systems with frequent producer initialization, consider batching producer initializations or implementing a connection pooling strategy to amortize the cost of these larger requests. Standards
|
||||||||||||||||
|
|
||||||||||||||||
| InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData), | ||||||||||||||||
| isEpochBump); | ||||||||||||||||
| enqueueRequest(handler); | ||||||||||||||||
|
|
||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -73,6 +73,7 @@ | |
| import org.apache.kafka.common.requests.EndTxnResponse; | ||
| import org.apache.kafka.common.requests.FindCoordinatorRequest; | ||
| import org.apache.kafka.common.requests.FindCoordinatorResponse; | ||
| import org.apache.kafka.common.requests.InitProducerIdRequest; | ||
| import org.apache.kafka.common.requests.InitProducerIdResponse; | ||
| import org.apache.kafka.common.requests.JoinGroupRequest; | ||
| import org.apache.kafka.common.requests.MetadataResponse; | ||
|
|
@@ -102,6 +103,7 @@ | |
| import org.junit.jupiter.api.Test; | ||
| import org.junit.jupiter.api.TestInfo; | ||
| import org.junit.jupiter.params.ParameterizedTest; | ||
| import org.junit.jupiter.params.provider.CsvSource; | ||
| import org.junit.jupiter.params.provider.ValueSource; | ||
| import org.mockito.MockedStatic; | ||
| import org.mockito.Mockito; | ||
|
|
@@ -1289,7 +1291,7 @@ public void testInitTransactionsResponseAfterTimeout() throws Exception { | |
| ((FindCoordinatorRequest) request).data().keyType() == FindCoordinatorRequest.CoordinatorType.TRANSACTION.id(), | ||
| FindCoordinatorResponse.prepareResponse(Errors.NONE, "bad-transaction", NODE)); | ||
|
|
||
| Future<?> future = executor.submit(producer::initTransactions); | ||
| Future<?> future = executor.submit(() -> producer.initTransactions()); | ||
| TestUtils.waitForCondition(client::hasInFlightRequests, | ||
| "Timed out while waiting for expected `InitProducerId` request to be sent"); | ||
|
|
||
|
|
@@ -1364,6 +1366,59 @@ public void testInitTransactionWhileThrottled() { | |
| } | ||
| } | ||
|
|
||
| @ParameterizedTest | ||
| @CsvSource({ | ||
| "true, false", | ||
| "true, true", | ||
| "false, true" | ||
| }) | ||
| public void testInitTransactionsWithKeepPreparedTxnAndTwoPhaseCommit(boolean keepPreparedTxn, boolean enable2PC) { | ||
| Map<String, Object> configs = new HashMap<>(); | ||
| configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-txn-id"); | ||
| configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000); | ||
| configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); | ||
| if (enable2PC) { | ||
| configs.put(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG, true); | ||
| } | ||
|
|
||
| Time time = new MockTime(1); | ||
| MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1)); | ||
| ProducerMetadata metadata = newMetadata(0, 0, Long.MAX_VALUE); | ||
| MockClient client = new MockClient(time, metadata); | ||
| client.updateMetadata(initialUpdateResponse); | ||
|
|
||
| // Capture flags from the InitProducerIdRequest | ||
| boolean[] requestFlags = new boolean[2]; // [keepPreparedTxn, enable2Pc] | ||
|
|
||
| client.prepareResponse( | ||
| request -> request instanceof FindCoordinatorRequest && | ||
| ((FindCoordinatorRequest) request).data().keyType() == FindCoordinatorRequest.CoordinatorType.TRANSACTION.id(), | ||
| FindCoordinatorResponse.prepareResponse(Errors.NONE, "test-txn-id", NODE)); | ||
|
|
||
| client.prepareResponse( | ||
| request -> { | ||
| if (request instanceof InitProducerIdRequest) { | ||
| InitProducerIdRequest initRequest = (InitProducerIdRequest) request; | ||
| requestFlags[0] = initRequest.data().keepPreparedTxn(); | ||
|
Comment on lines
+1400
to
+1402
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Null Safety CheckThe test code accesses initRequest.data() without null checking. If data() returns null, this would cause a NullPointerException, making tests unreliable and potentially masking real issues. Standards
|
||
| requestFlags[1] = initRequest.data().enable2Pc(); | ||
| return true; | ||
| } | ||
| return false; | ||
| }, | ||
| initProducerIdResponse(1L, (short) 5, Errors.NONE)); | ||
|
|
||
| try (Producer<String, String> producer = kafkaProducer(configs, new StringSerializer(), | ||
| new StringSerializer(), metadata, client, null, time)) { | ||
| producer.initTransactions(keepPreparedTxn); | ||
|
|
||
| // Verify request flags match expected values | ||
| assertEquals(keepPreparedTxn, requestFlags[0], | ||
| "keepPreparedTxn flag should match input parameter"); | ||
| assertEquals(enable2PC, requestFlags[1], | ||
| "enable2Pc flag should match producer configuration"); | ||
|
Comment on lines
+1412
to
+1418
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Null Safety CheckThe test accesses requestFlags array without checking if the InitProducerIdRequest was actually processed. If the request handler wasn't called, requestFlags would contain default values (false), potentially causing false test failures. Standards
|
||
| } | ||
| } | ||
|
|
||
| @Test | ||
| public void testClusterAuthorizationFailure() throws Exception { | ||
| int maxBlockMs = 500; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing Default Handling
The default implementation of initTransactions() hardcodes 'false' for keepPreparedTxn. This prevents configuration-based defaults and forces explicit method calls for 2PC functionality, potentially causing inconsistent transaction behavior.
Standards