Skip to content

Conversation

@visz11
Copy link
Collaborator

@visz11 visz11 commented Oct 14, 2025

User description

Delete this text and replace it with a detailed description of your change. The
PR title and body will become the squashed commit message.

If you would like to tag individuals, add some commentary, upload images, or
include other supplemental information that should not be part of the eventual
commit message, please use a separate comment.

If applicable, please include a summary of the testing strategy (including
rationale) for the proposed change. Unit and/or integration tests are expected
for any behavior change and system tests should be considered for larger
changes.


CodeAnt-AI Description

Enable two-phase commit and allow initTransactions to retain prepared transactions

What Changed

  • New producer option transaction.two.phase.commit.enable lets the client indicate it participates in two-phase commit; when enabled, transactions started by that client do not expire on the broker
  • initTransactions gained an overload that accepts keepPreparedTxn; calling initTransactions(true) preserves any previously prepared (in-doubt) transactions so an external coordinator can finalize them instead of the producer aborting them
  • Producer initialization now sends flags to the broker to indicate two-phase commit participation and whether to keep prepared transactions; the no-argument initTransactions() still behaves the same as before
  • Configuration validation now rejects configurations that set transaction.timeout.ms while two-phase commit is enabled to avoid incompatible timeouts
  • Unit tests added and updated to verify the new config validation, the InitProducerId request flags, and initialization behavior that preserves prepared transactions

Impact

✅ Support two-phase commit transactions
✅ Prevents misconfiguration of transaction timeout with 2PC
✅ Can recover prepared transactions without aborting for external coordinators

💡 Usage Guide

Checking Your Pull Request

Every time you make a pull request, our system automatically looks through it. We check for security issues, mistakes in how you're setting up your infrastructure, and common code problems. We do this to make sure your changes are solid and won't cause any trouble later.

Talking to CodeAnt AI

Got a question or need a hand with something in your pull request? You can easily get in touch with CodeAnt AI right here. Just type the following in a comment on your pull request, and replace "Your question here" with whatever you want to ask:

@codeant-ai ask: Your question here

This lets you have a chat with CodeAnt AI about your pull request, making it easier to understand and improve your code.

Retrigger review

Ask CodeAnt AI to review the PR again, by typing:

@codeant-ai: review

Check Your Repository Health

To analyze the health of your code repository, visit our dashboard at https://app.codeant.ai. This tool helps you identify potential issues and areas for improvement in your codebase, ensuring your repository maintains high standards of code health.

Summary by CodeRabbit

  • New Features

    • Added optional two-phase commit support for producers via a new configuration.
    • Introduced initTransactions(boolean keepPreparedTxn) to allow recovery by keeping prepared transactions.
    • Added validation preventing use of transaction timeout when two-phase commit is enabled.
  • Refactor

    • Producer API gains an overload for initTransactions; existing calls remain compatible via delegation.
    • MockProducer’s initTransactions now takes a boolean parameter.
  • Tests

    • Added tests covering two-phase commit flags, transaction recovery behavior, and configuration validation.

@codeant-ai
Copy link

codeant-ai bot commented Oct 14, 2025

CodeAnt AI is reviewing your PR.


Thanks for using CodeAnt! 🎉

We're free for open-source projects. if you're enjoying it, help us grow by sharing.

Share on X ·
Reddit ·
LinkedIn

@gemini-code-assist
Copy link

Summary of Changes

Hello @visz11, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces foundational client-side support for Two-Phase Commit (2PC) transactions within the Kafka producer. It provides mechanisms for producers to declare their participation in 2PC, manage transaction initialization during recovery by preserving prepared transactions, and ensures proper communication of these new transactional states to the Kafka brokers. These changes are crucial for integrating Kafka with external transaction managers in distributed systems.

Highlights

  • Two-Phase Commit (2PC) Configuration: Introduced a new producer configuration transaction.two.phase.commit.enable to explicitly inform the broker that the client is participating in a two-phase commit protocol, which implies that transactions started by this client should not expire.
  • Enhanced Transaction Initialization: Added an overloaded initTransactions(boolean keepPreparedTxn) method to the Producer interface and its implementations. This new method allows the producer to initialize transactional state while optionally retaining previously prepared transactions, a critical feature for 2PC recovery scenarios.
  • Transaction Manager Updates: The internal TransactionManager and the InitProducerIdRequest have been updated to accept and propagate the enable2PC and keepPreparedTxn flags to the Kafka broker, enabling the server-side to handle 2PC-specific transaction states.
  • Configuration Validation: Implemented validation logic to prevent conflicting configurations: transaction.timeout.ms cannot be set when transaction.two.phase.commit.enable is true, as 2PC relies on an external coordinator for transaction finalization, making broker-side timeouts irrelevant.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@coderabbitai
Copy link

coderabbitai bot commented Oct 14, 2025

Walkthrough

Adds two-phase commit (2PC) enablement and recovery support to producer transactions. Introduces a new config flag, updates Producer and TransactionManager APIs to accept keepPreparedTxn, threads 2PC flags into InitProducerId requests, and updates tests accordingly.

Changes

Cohort / File(s) Summary
Producer API and implementation
clients/src/main/java/.../producer/KafkaProducer.java, clients/src/main/java/.../producer/Producer.java, clients/src/main/java/.../producer/MockProducer.java
Adds initTransactions(boolean keepPreparedTxn); default initTransactions() now delegates to false. KafkaProducer wires keepPreparedTxn into transaction initialization. MockProducer signature updated to accept the boolean (no behavioral change).
Transaction coordination internals
clients/src/main/java/.../producer/internals/TransactionManager.java
Extends constructor with boolean enable2PC; adds initializeTransactions(boolean). Propagates enable2Pc and keepPreparedTxn to InitProducerId requests; updates logging and initialization flow.
Producer configuration
clients/src/main/java/.../producer/ProducerConfig.java
Introduces TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG (boolean, default false) with docs. Validates incompatibility with transaction.timeout.ms when 2PC is enabled; integrates into post-processing.
Producer tests
clients/src/test/java/.../producer/KafkaProducerTest.java, clients/src/test/java/.../producer/ProducerConfigTest.java
Adds parameterized tests verifying InitProducerIdRequest flags for keepPreparedTxn/enable2Pc. Adds validation test ensuring 2PC cannot be combined with transaction timeout. Minor lambda adjustment.
Internals tests
clients/src/test/java/.../producer/internals/SenderTest.java, clients/src/test/java/.../producer/internals/TransactionManagerTest.java
Updates TransactionManager construction with new enable2PC param (false by default in tests). Expands helpers and adds tests to cover keepPreparedTxn and ongoing txn recovery with 2PC, plus request/response assertions.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor App as Application
  participant KP as KafkaProducer
  participant TM as TransactionManager
  participant NC as NetworkClient
  participant BR as Broker

  App->>KP: initTransactions(keepPreparedTxn)
  KP->>TM: initializeTransactions(keepPreparedTxn)
  Note over TM: TM constructed with enable2PC from config

  TM->>NC: Send InitProducerIdRequest<br/>(enable2Pc, keepPreparedTxn)
  NC->>BR: InitProducerIdRequest
  BR-->>NC: InitProducerIdResponse<br/>(producerId, epoch[, ongoingTxn*])
  NC-->>TM: Response
  TM-->>KP: TransactionalRequestResult
  KP-->>App: initTransactions() returns

  note over BR,TM: * If keepPreparedTxn is true and 2PC enabled,<br/>broker may return ongoing producerId/epoch for recovery.
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

I twitch my ears at flags of two-phase cheer,
Flip a bit: keepPreparedTxn, recovery draws near.
Producer paws tap init with careful might,
Brokers whisper epochs in the night.
Carrots committed, no timeouts to fear—
Hop, hop: transactions, crystal-clear! 🥕✨

Pre-merge checks and finishing touches

❌ Failed checks (3 warnings)
Check name Status Explanation Resolution
Title Check ⚠️ Warning The title “Clone kafka 19082 client side 2 pc changes pt1” includes an internal issue reference and the term “Clone,” and it does not clearly and concisely summarize the primary change to enable two-phase commit support. While it mentions “2 pc changes,” it remains overly verbose and noisy and would not quickly inform a reviewer of the core functional update. It fails to meet the guideline of providing a short, single-sentence summary of the main change. Please revise the title to a concise statement of the primary change, for example “Enable two-phase commit support in KafkaProducer,” removing internal issue numbers and extraneous terms.
Description Check ⚠️ Warning The PR description still contains the placeholder template text instructing deletion and replacement, rather than a substantive summary of the change and its testing strategy. Although an AI-generated description of behavior is present, the repository’s required template wasn’t followed and critical sections remain unfilled. This means the description does not fulfill the structure or intent of the provided template. Remove the placeholder instructions and replace them with a detailed description per the template, including a summary of what was changed, the rationale, and the testing strategy used (unit, integration, or system tests) for the new two-phase commit functionality.
Docstring Coverage ⚠️ Warning Docstring coverage is 14.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch clone-KAFKA-19082-Client-Side-2PC-Changes-pt1

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 ast-grep (0.39.6)
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@codeant-ai codeant-ai bot added the size:L This PR changes 100-499 lines, ignoring generated files label Oct 14, 2025
@refacto-visz
Copy link

refacto-visz bot commented Oct 14, 2025

Clone kafka 19082 client side 2 pc changes pt1

TL;DR: Implements comprehensive client-side improvements including HashMap to EnumMap optimizations, new transaction expiration tests, PR formatting automation, and Share consumer enhancements.


Refacto PR Summary

Major client-side refactoring focused on performance optimization and feature enhancement across multiple Kafka components.
Replaces HashMap with EnumMap in 20+ response classes for better performance, adds comprehensive transaction expiration testing, implements automated PR formatting with text wrapping, and enhances Share consumer functionality with isolation level support. This PR represents a significant client-side modernization effort spanning performance optimizations, testing infrastructure, and feature enhancements. The core changes include systematic replacement of HashMap with EnumMap in error counting methods across all response classes for improved enum-based performance, introduction of comprehensive transaction expiration integration tests covering producer ID lifecycle and fatal error scenarios, automated PR formatting system with intelligent text wrapping and markdown handling, and substantial Share consumer improvements including read committed/uncommitted isolation levels and explicit acknowledgment mode validation.

Change Highlights

Click to expand
  • clients/src/main/java/org/apache/kafka/common/requests/*Response.java: HashMap to EnumMap conversion for error counting optimization
  • clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/TransactionsExpirationTest.java: New comprehensive transaction expiration testing with TV1/TV2 support
  • .github/scripts/pr-format.py: Automated PR formatting with intelligent text wrapping and paragraph handling
  • clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java: Enhanced isolation level testing and explicit acknowledgment validation
  • clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java: Improved response handling with null record protection and server-side optimizations
  • clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java: New DeleteShareGroupOffsets API testing coverage
  • clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java: PartitionResponse class cleanup removing unused lastOffset field

Sequence Diagram

sequenceDiagram
    participant Client as Kafka Client
    participant Producer as Transaction Producer
    participant Broker as Kafka Broker
    participant Consumer as Share Consumer
    participant Admin as Admin Client
    
    Client->>Producer: initTransactions()
    Producer->>Broker: Producer ID request
    Broker-->>Producer: Producer ID + Epoch
    
    Producer->>Broker: beginTransaction()
    Producer->>Broker: Send records
    Producer->>Broker: commitTransaction()
    
    Note over Producer,Broker: Producer ID expires after 5s
    
    Producer->>Broker: New transaction attempt
    Broker-->>Producer: InvalidPidMappingException
    Producer->>Producer: Fatal error state
    
    Consumer->>Broker: ShareFetch (isolation=read_committed)
    Broker-->>Consumer: Only committed records
    Consumer->>Consumer: Acknowledge records
    
    Admin->>Broker: DeleteShareGroupOffsets
    Broker-->>Admin: Deletion confirmation
Loading

Testing Guide

Click to expand
  1. Transaction Expiration: Run TransactionsExpirationTest to verify producer ID expiration handling and fatal error recovery scenarios
  2. Share Consumer Isolation: Test read_committed vs read_uncommitted isolation levels with mixed committed/aborted transactions
  3. Explicit Acknowledgment: Verify ShareConsumer throws IllegalStateException when polling with unacknowledged records in explicit mode
  4. HashMap to EnumMap: Performance test error counting methods in response classes to confirm EnumMap optimization benefits
  5. PR Formatting: Test automated PR body formatting with various markdown structures and text wrapping scenarios

@visz11
Copy link
Collaborator Author

visz11 commented Oct 14, 2025

/refacto-visz

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces client-side changes to support two-phase commit (2PC) for Kafka transactions. The changes include a new producer configuration transaction.two.phase.commit.enable, an overloaded initTransactions method to support transaction recovery without automatic aborts, and updates to the TransactionManager to handle these new options. Additionally, validation has been added to prevent the use of transaction.timeout.ms with 2PC enabled. The accompanying tests are thorough and cover the new functionality well. I have one suggestion to enhance test completeness.

Comment on lines +1370 to +1374
@CsvSource({
"true, false",
"true, true",
"false, true"
})

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To enhance test completeness, consider adding the "false, false" case to your @CsvSource. While this represents the standard transaction initialization and is likely covered in other tests, its inclusion would make this parameterized test exhaustive by explicitly covering all four boolean combinations of keepPreparedTxn and enable2PC.

    @CsvSource({
        "true, false",
        "true, true",
        "false, true",
        "false, false"
    })

@refacto-visz
Copy link

refacto-visz bot commented Oct 14, 2025

Refacto is reviewing this PR. Please wait for the review comments to be posted.

Comment on lines 618 to +619
TransactionManager transactionManager = new TransactionManager(new LogContext(), "testInitProducerIdWithPendingMetadataRequest",
60000, 100L, new ApiVersions());
60000, 100L, new ApiVersions(), false);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: Use the existing test-class logContext and apiVersions instances instead of creating new ones when constructing TransactionManager to ensure consistent logging and API version state across the test. [best practice]

Suggested change
TransactionManager transactionManager = new TransactionManager(new LogContext(), "testInitProducerIdWithPendingMetadataRequest",
60000, 100L, new ApiVersions());
60000, 100L, new ApiVersions(), false);
TransactionManager transactionManager = new TransactionManager(logContext, "testInitProducerIdWithPendingMetadataRequest",
60000, 100L, apiVersions, false);
Why Change? ⭐

Replacing locally-constructed LogContext and ApiVersions with the test-class fields logContext and apiVersions
is syntactically correct and safe: both fields are declared on the test class (see the top of the file) and are accessible
from this test method. The TransactionManager constructor signature matches the arguments used elsewhere in the file.
Using shared instances preserves any prior modifications to apiVersions and keeps logging consistent across tests.
No new imports or APIs are required and the change does not alter logic beyond using shared state, so it is executable
and should not introduce runtime errors.

Comment on lines 670 to +671
TransactionManager transactionManager = new TransactionManager(new LogContext(), "testNodeNotReady",
60000, 100L, new ApiVersions());
60000, 100L, new ApiVersions(), false);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: Replace the ad-hoc creation of LogContext and ApiVersions with the test's shared logContext and apiVersions to preserve any prior updates to apiVersions and keep logs consistent. [best practice]

Suggested change
TransactionManager transactionManager = new TransactionManager(new LogContext(), "testNodeNotReady",
60000, 100L, new ApiVersions());
60000, 100L, new ApiVersions(), false);
TransactionManager transactionManager = new TransactionManager(logContext, "testNodeNotReady",
60000, 100L, apiVersions, false);
Why Change? ⭐

The proposed change uses the existing instance fields logContext and apiVersions declared on the test class.
The constructor parameters match other occurrences in the test file and are valid. This improves consistency (e.g.
any previous updates to apiVersions are preserved) and does not change control flow. The change is syntactically valid,
compiles in the same context, and will not introduce runtime errors by itself.


private TransactionManager createTransactionManager() {
return new TransactionManager(new LogContext(), null, 0, RETRY_BACKOFF_MS, new ApiVersions());
return new TransactionManager(new LogContext(), null, 0, RETRY_BACKOFF_MS, new ApiVersions(), false);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: Have the helper createTransactionManager() return a TransactionManager constructed with the test-class logContext and apiVersions instances instead of creating new ones locally, so helper-created managers share the same context/version state. [best practice]

Suggested change
return new TransactionManager(new LogContext(), null, 0, RETRY_BACKOFF_MS, new ApiVersions(), false);
return new TransactionManager(logContext, null, 0, RETRY_BACKOFF_MS, apiVersions, false);
Why Change? ⭐

createTransactionManager() is a private instance helper in the test class, so it can safely reference instance fields
logContext and apiVersions. The suggested replacement matches the constructor signature and aligns with other usages
in the file. Using the shared instances is beneficial to preserve any test-level state (e.g. updates to apiVersions)
and keeps logging consistent. This change is syntactically valid and will compile and run in the current test class context.

@codeant-ai
Copy link

codeant-ai bot commented Oct 14, 2025

Pull Request Feedback 🔍

🔒 No security issues identified
⚡ Recommended areas for review

  • Missing transactional.id check
    The new two-phase commit flag (transaction.two.phase.commit.enable) indicates the client participates in 2PC.
    Two-phase commit semantics require a transactional producer (i.e., a configured transactional.id). The PR adds a validation that disallows transaction.timeout.ms when 2PC is enabled, but does not validate that transactional.id is present when 2PC is enabled. This can lead to invalid runtime behavior or confusing configurations. Please confirm intended behavior and add validation that enabling 2PC without transactional.id is rejected.

  • Idempotence requirement
    Two-phase commit participation should imply transactional semantics which in turn require idempotence. The added code does not verify that enable.idempotence (or equivalent resulting idempotenceEnabled) is true when transaction.two.phase.commit.enable is enabled. Ensure the code rejects configurations where 2PC is enabled but idempotence is disabled (or document why it's allowed).

  • Config validation
    The new transaction.two.phase.commit.enable option is read and passed into TransactionManager
    but there is no local validation that (a) 2PC requires a transactional.id, and (b) that certain
    configs (e.g. transaction.timeout.ms) are incompatible with 2PC and should be rejected. This can
    lead to surprising runtime behaviour or misconfiguration that should be rejected early.

  • Transaction timeout validation scope
    The new validation rejects transaction.timeout.ms only when the user explicitly sets it (checked via originalConfigs.containsKey). If the default timeout remains (i.e., user did not provide it), the validation won't trigger — is this intended? Review whether any non-default broker/client timeout values should be disallowed when 2PC is enabled, or whether defaults are acceptable. Update tests/docs accordingly.

  • Constructor compatibility
    The call to new TransactionManager(..., apiVersions, enable2PC) was added. Ensure the TransactionManager
    class has been updated to accept the additional apiVersions and enable2PC parameters and that the
    parameter order and semantics match the new call; otherwise this will fail to compile or behave incorrectly.

  • Constructor call sites
    The TransactionManager constructor signature changed to add ApiVersions apiVersions (moved) and boolean enable2PC. All call sites must be updated to pass the new boolean; audit consumers of this constructor to avoid build/runtime failures.

Comment on lines +45 to +52
default void initTransactions() {
initTransactions(false);
}

/**
* See {@link KafkaProducer#initTransactions(boolean)}
*/
void initTransactions(boolean keepPreparedTxn);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: Revert the delegation direction to preserve compatibility: keep the existing no-arg initTransactions() signature as the abstract method and provide a default implementation for the new initTransactions(boolean) overload that delegates to the original no-arg contract. [possible issue]

@codeant-ai
Copy link

codeant-ai bot commented Oct 14, 2025

CodeAnt AI finished reviewing your PR.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (9)
clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java (1)

145-159: Track the keepPreparedTxn parameter for better test support.

The new keepPreparedTxn parameter is not used or stored, which limits the mock's ability to support testing of two-phase commit behavior. Consider adding a field to track this value so tests can verify that 2PC-related flags are passed correctly.

Apply this diff to track the parameter:

+    private boolean keepPreparedTxn;
+
     @Override
     public void initTransactions(boolean keepPreparedTxn) {
         verifyNotClosed();
         verifyNotFenced();
         if (this.transactionInitialized) {
             throw new IllegalStateException("MockProducer has already been initialized for transactions.");
         }
         if (this.initTransactionException != null) {
             throw this.initTransactionException;
         }
+        this.keepPreparedTxn = keepPreparedTxn;
         this.transactionInitialized = true;
         this.transactionInFlight = false;
         this.transactionCommitted = false;
         this.transactionAborted = false;
         this.sentOffsets = false;
     }

Additionally, add a getter method for test verification:

public boolean keepPreparedTxn() {
    return this.keepPreparedTxn;
}
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java (2)

358-361: Consider enhancing the documentation for clarity.

The documentation accurately describes the config, but could be more comprehensive for users unfamiliar with two-phase commit. Consider adding:

  • That an external transaction coordinator manages the transaction lifecycle (explaining why timeouts don't apply)
  • That this config requires transactional.id to be set (if that's the intended requirement)
  • When users would need this feature (e.g., coordinating Kafka transactions with external systems)

Example enhancement:

-    private static final String TRANSACTION_TWO_PHASE_COMMIT_ENABLE_DOC = "If set to true, then the broker is informed that the client is participating in " +
-            "two phase commit protocol and transactions that this client starts never expire.";
+    private static final String TRANSACTION_TWO_PHASE_COMMIT_ENABLE_DOC = "If set to true, the producer participates in " +
+            "two-phase commit (2PC) protocol with an external transaction coordinator. Transactions do not expire " +
+            "because the external coordinator is responsible for deciding when to commit or abort. " +
+            "This config requires setting a <code>" + TRANSACTIONAL_ID_CONFIG + "</code>. " +
+            "Use this when coordinating Kafka transactions with other transactional systems.";

534-538: Consider using Importance.MEDIUM instead of LOW.

This config fundamentally changes how transactions are managed (removing broker-side expiration and delegating to an external coordinator). Given that it alters core transaction semantics, Importance.MEDIUM might be more appropriate than LOW.

Apply this diff:

                                 .define(TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG,
                                         Type.BOOLEAN,
                                         false,
-                                        Importance.LOW,
+                                        Importance.MEDIUM,
                                         TRANSACTION_TWO_PHASE_COMMIT_ENABLE_DOC)
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java (1)

1369-1420: Solid coverage of keepPreparedTxn and 2PC flags; two small tweaks

  • Prefer the wrapper getters on InitProducerIdRequest for clarity.
  • Consider adding the fourth combination (keepPreparedTxn=false, enable2PC=false) for completeness.

Apply these diffs:

@@
-    @CsvSource({
-        "true, false",
-        "true, true",
-        "false, true"
-    })
+    @CsvSource({
+        "true, false",
+        "true, true",
+        "false, true",
+        "false, false"
+    })
@@
-                if (request instanceof InitProducerIdRequest) {
-                    InitProducerIdRequest initRequest = (InitProducerIdRequest) request;
-                    requestFlags[0] = initRequest.data().keepPreparedTxn();
-                    requestFlags[1] = initRequest.data().enable2Pc();
+                if (request instanceof InitProducerIdRequest) {
+                    InitProducerIdRequest initRequest = (InitProducerIdRequest) request;
+                    requestFlags[0] = initRequest.keepPreparedTxn();
+                    requestFlags[1] = initRequest.enable2Pc();
                     return true;
                 }
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java (1)

141-142: Refine log message when keepPreparedTxn=true and 2PC is disabled
Wrap the existing log in an enable2PC check (or use neutral phrasing) to avoid implying 2PC when it’s disabled. For example:

@@ -308,5 +308,10 @@
             if (keepPreparedTxn) {
-                log.info("Invoking InitProducerId with keepPreparedTxn set to true for 2PC transactions");
+                if (enable2PC) {
+                    log.info("Invoking InitProducerId with keepPreparedTxn=true (2PC enabled)");
+                } else {
+                    log.info("Invoking InitProducerId with keepPreparedTxn=true (2PC disabled)");
+                }
             }
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java (4)

179-201: Version shaping: avoid hard‑coded InitProducerId max; optionally include END_TXN

  • Use ApiKeys.INIT_PRODUCER_ID.latestVersion() instead of literal 6 to track schema bumps automatically.
  • Optional: add END_TXN ApiVersion to mirror PRODUCE/TXN_OFFSET_COMMIT gating, reducing reliance on defaults in request version negotiation.

Apply:

             new ApiVersion()
                 .setApiKey(ApiKeys.INIT_PRODUCER_ID.id)
                 .setMinVersion((short) 0)
-                .setMaxVersion((short) 6),
+                .setMaxVersion(ApiKeys.INIT_PRODUCER_ID.latestVersion()),
             new ApiVersion()
                 .setApiKey(ApiKeys.PRODUCE.id)
                 .setMinVersion((short) 0)
                 .setMaxVersion(transactionV2Enabled ? ApiKeys.PRODUCE.latestVersion() : (short) 11),
+            new ApiVersion()
+                .setApiKey(ApiKeys.END_TXN.id)
+                .setMinVersion((short) 0)
+                .setMaxVersion(transactionV2Enabled ? ApiKeys.END_TXN.latestVersion() : (short) 4),
             new ApiVersion()
                 .setApiKey(ApiKeys.TXN_OFFSET_COMMIT.id)
                 .setMinVersion((short) 0)
                 .setMaxVersion(transactionV2Enabled ? ApiKeys.TXN_OFFSET_COMMIT.latestVersion() : (short) 4)),

1052-1068: Disable‑V2 fixture: consistent; optional END_TXN entry

The targeted ApiVersions are set correctly to keep V2 off. For completeness, consider adding END_TXN version here too (as above) to make request negotiation explicit.


4049-4084: InitProducerId matcher: make 2PC/keepPrepared assertions version‑aware

If a test later downgrades InitProducerId below the version that carries keepPreparedTxn/enable2Pc, these assertions will become brittle. Guard them by request.version().

     private void prepareInitPidResponse(
         Errors error,
         boolean shouldDisconnect,
         long producerId,
         short producerEpoch,
         boolean keepPreparedTxn,
         boolean enable2Pc,
         long ongoingProducerId,
         short ongoingProducerEpoch
     ) {
         InitProducerIdResponseData responseData = new InitProducerIdResponseData()
             .setErrorCode(error.code())
             .setProducerEpoch(producerEpoch)
             .setProducerId(producerId)
             .setThrottleTimeMs(0)
             .setOngoingTxnProducerId(ongoingProducerId)
             .setOngoingTxnProducerEpoch(ongoingProducerEpoch);
 
         client.prepareResponse(body -> {
             InitProducerIdRequest initProducerIdRequest = (InitProducerIdRequest) body;
             assertEquals(transactionalId, initProducerIdRequest.data().transactionalId());
             assertEquals(transactionTimeoutMs, initProducerIdRequest.data().transactionTimeoutMs());
-            assertEquals(keepPreparedTxn, initProducerIdRequest.data().keepPreparedTxn());
-            assertEquals(enable2Pc, initProducerIdRequest.data().enable2Pc());
+            short reqVersion = initProducerIdRequest.version();
+            if (reqVersion >= 6) {
+                assertEquals(keepPreparedTxn, initProducerIdRequest.data().keepPreparedTxn());
+                assertEquals(enable2Pc, initProducerIdRequest.data().enable2Pc());
+            } else {
+                // Older versions do not carry these fields; they should effectively be false.
+                assertFalse(initProducerIdRequest.data().keepPreparedTxn());
+                assertFalse(initProducerIdRequest.data().enable2Pc());
+            }
             return true;
         }, new InitProducerIdResponse(responseData), shouldDisconnect);
     }

4410-4441: Strengthen the keepPreparedTxn test: use the helper and assert result

  • Use prepareFindCoordinatorResponse(...) for request validation consistency.
  • Capture and assert the TransactionalRequestResult from initializeTransactions(true).
 @Test
 public void testInitializeTransactionsWithKeepPreparedTxn() {
-    initializeTransactionManager(Optional.of(transactionalId), true, true);
-
-    client.prepareResponse(
-        FindCoordinatorResponse.prepareResponse(Errors.NONE, transactionalId, brokerNode)
-    );
+    initializeTransactionManager(Optional.of(transactionalId), true, true);
+    prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
@@
-    prepareInitPidResponse(
+    prepareInitPidResponse(
         Errors.NONE,
         false,
         ongoingProducerId,
         bumpedEpoch,
         true,
         true,
         ongoingProducerId,
         ongoingEpoch
     );
 
-    transactionManager.initializeTransactions(true);
+    TransactionalRequestResult result = transactionManager.initializeTransactions(true);
     runUntil(transactionManager::hasProducerId);
-    
+    assertTrue(result.isCompleted());
+    assertTrue(result.isSuccessful());
     assertTrue(transactionManager.hasProducerId());
     assertFalse(transactionManager.hasOngoingTransaction());
     assertEquals(ongoingProducerId, transactionManager.producerIdAndEpoch().producerId);
     assertEquals(bumpedEpoch, transactionManager.producerIdAndEpoch().epoch);
 }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ed79212 and 4a08713.

📒 Files selected for processing (9)
  • clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java (2 hunks)
  • clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java (1 hunks)
  • clients/src/main/java/org/apache/kafka/clients/producer/Producer.java (1 hunks)
  • clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java (3 hunks)
  • clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java (5 hunks)
  • clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java (4 hunks)
  • clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java (1 hunks)
  • clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java (18 hunks)
  • clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java (5 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java (1)
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java (1)
  • ProducerConfig (53-686)
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java (2)
clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java (1)
  • InitProducerIdRequest (26-85)
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java (1)
  • ProducerConfig (53-686)
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java (1)
clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java (1)
  • FindCoordinatorResponse (36-189)
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java (1)
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java (1)
  • ProducerConfig (53-686)
🔇 Additional comments (8)
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java (3)

76-76: Import added for InitProducerIdRequest is expected for the new test; no concerns.


106-106: Import added for CsvSource is fine.


1294-1294: Good fix: disambiguate overloaded initTransactions via lambda

Using () -> producer.initTransactions() avoids ambiguity after introducing the boolean overload.

clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java (1)

149-170: Nice negative/positive coverage for 2PC vs transaction.timeout.ms conflict

The test correctly asserts the incompatibility and validates both acceptable alternatives.

clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java (1)

493-493: LGTM! TransactionManager constructor updates are consistent.

All 18 TransactionManager constructor call sites have been correctly updated to include the new enable2PC boolean parameter with false as the value. This maintains the existing non-2PC behavior for all existing tests while accommodating the new constructor signature.

Note: While these mechanical changes are correct, consider whether separate tests for enable2PC=true scenarios should be added to verify the new 2PC functionality. This may be out of scope for part 1 of the changes.

Also applies to: 619-619, 671-671, 1513-1513, 1798-1798, 2355-2355, 2697-2697, 2730-2730, 2787-2787, 2858-2858, 2892-2892, 2922-2922, 2948-2948, 2960-2960, 3029-3029, 3079-3079, 3118-3118, 3620-3620

clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java (3)

162-163: Defaulting enable2pc to false in setup is fine

Keeps existing tests’ baseline behavior unchanged.


165-170: Overload delegation looks correct

Simple pass‑through avoids duplication.


203-204: TransactionManager ctor extended with enable2pc: OK

Constructor call aligns with the new signature.

Comment on lines +45 to +47
default void initTransactions() {
initTransactions(false);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Verify that breaking change for external implementations is acceptable.

Converting initTransactions() from abstract to a default method that delegates to the new initTransactions(boolean) introduces a breaking change: any external implementations of the Producer interface must now implement the boolean variant. While call-site compatibility is preserved (existing callers can still use the no-arg version), implementers who previously only overrode initTransactions() will face compilation errors until they implement initTransactions(boolean keepPreparedTxn).

Run the following script to identify any external implementations that might be affected:


🏁 Script executed:

#!/bin/bash
# Description: Search for external Producer interface implementations to assess breaking change impact

# Search for classes implementing Producer interface
rg -n --type=java -A 5 -B 2 'class\s+\w+.*implements.*Producer<'

Length of output: 1948


Avoid breaking change to external Producer implementations
Adding a new abstract method initTransactions(boolean keepPreparedTxn) to the Producer interface will break any external implementers. Either make initTransactions(boolean) a default method (delegating to initTransactions()) or reserve this change for a major-version bump. Also expand the Javadoc to explain the semantics of the new keepPreparedTxn parameter.

Comment on lines +49 to +52
/**
* See {@link KafkaProducer#initTransactions(boolean)}
*/
void initTransactions(boolean keepPreparedTxn);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Expand Javadoc to explain the keepPreparedTxn parameter.

The Javadoc merely references KafkaProducer#initTransactions(boolean) without explaining the semantics of keepPreparedTxn. API consumers need to understand when to pass true vs false and how it affects transaction recovery, especially in two-phase commit scenarios.

Consider adding detailed documentation similar to this:

 /**
- * See {@link KafkaProducer#initTransactions(boolean)}
+ * Initializes transactions for this producer, optionally preserving prepared transactions
+ * during recovery.
+ *
+ * @param keepPreparedTxn If true, any prepared (not yet committed or aborted) transactions
+ *                        are preserved during initialization, enabling two-phase commit
+ *                        recovery flows. If false (default), prepared transactions are
+ *                        aborted during initialization.
+ *
+ * @see KafkaProducer#initTransactions(boolean)
  */
 void initTransactions(boolean keepPreparedTxn);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/**
* See {@link KafkaProducer#initTransactions(boolean)}
*/
void initTransactions(boolean keepPreparedTxn);
/**
* Initializes transactions for this producer, optionally preserving prepared transactions
* during recovery.
*
* @param keepPreparedTxn If true, any prepared (not yet committed or aborted) transactions
* are preserved during initialization, enabling two-phase commit
* recovery flows. If false (default), prepared transactions are
* aborted during initialization.
*
* @see KafkaProducer#initTransactions(boolean)
*/
void initTransactions(boolean keepPreparedTxn);
🤖 Prompt for AI Agents
In clients/src/main/java/org/apache/kafka/clients/producer/Producer.java around
lines 49 to 52, the Javadoc for initTransactions(boolean) only references
KafkaProducer#initTransactions(boolean) and lacks an explanation of the
keepPreparedTxn parameter; update the Javadoc to describe the semantics of
keepPreparedTxn (what true vs false means), when callers should pass true or
false (e.g., preserving prepared but uncommitted transactions for two-phase
commit/recovery vs discarding them), how it affects transaction recovery and
visibility on broker restart, and any safety/ordering or client-state
implications so API consumers can choose correctly.

Comment on lines +622 to +635

// Validate that transaction.timeout.ms is not set when transaction.two.phase.commit.enable is true
// In standard Kafka transactions, the broker enforces transaction.timeout.ms and aborts any
// transaction that isn't completed in time. With two-phase commit (2PC), an external coordinator
// decides when to finalize, so broker-side timeouts don't apply. Disallow using both.
boolean enable2PC = this.getBoolean(TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG);
boolean userConfiguredTransactionTimeout = originalConfigs.containsKey(TRANSACTION_TIMEOUT_CONFIG);
if (enable2PC && userConfiguredTransactionTimeout) {
throw new ConfigException(
"Cannot set " + ProducerConfig.TRANSACTION_TIMEOUT_CONFIG +
" when " + ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG +
" is set to true. Transactions will not expire with two-phase commit enabled."
);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Add validation that 2PC requires transactional.id.

The validation correctly prevents using transaction.timeout.ms with 2PC, but doesn't verify that 2PC requires transactional.id to be set. Two-phase commit is meaningless without transactions, so enabling 2PC without a transactional ID should fail during configuration validation rather than at runtime.

Add this validation after line 621:

         if (!idempotenceEnabled && userConfiguredTransactions) {
             throw new ConfigException("Cannot set a " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence.");
         }
+
+        // Validate that transaction.two.phase.commit.enable requires transactional.id
+        boolean enable2PC = this.getBoolean(TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG);
+        if (enable2PC && !userConfiguredTransactions) {
+            throw new ConfigException(
+                "Cannot enable " + ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG +
+                " without setting a " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + ". " +
+                "Two-phase commit requires transactional producers."
+            );
+        }

Then move the existing 2PC validation to check userConfiguredTransactionTimeout:

-        // Validate that transaction.timeout.ms is not set when transaction.two.phase.commit.enable is true
-        // In standard Kafka transactions, the broker enforces transaction.timeout.ms and aborts any
-        // transaction that isn't completed in time. With two-phase commit (2PC), an external coordinator
-        // decides when to finalize, so broker-side timeouts don't apply. Disallow using both.
-        boolean enable2PC = this.getBoolean(TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG);
-        boolean userConfiguredTransactionTimeout = originalConfigs.containsKey(TRANSACTION_TIMEOUT_CONFIG);
-        if (enable2PC && userConfiguredTransactionTimeout) {
+        // Validate that transaction.timeout.ms is not explicitly set when 2PC is enabled
+        if (enable2PC && originalConfigs.containsKey(TRANSACTION_TIMEOUT_CONFIG)) {
             throw new ConfigException(
                 "Cannot set " + ProducerConfig.TRANSACTION_TIMEOUT_CONFIG +
                 " when " + ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG +
                 " is set to true. Transactions will not expire with two-phase commit enabled."
             );
         }

Clarify behavior of default transaction.timeout.ms with 2PC enabled.

The validation only prevents explicitly configured transaction.timeout.ms when 2PC is enabled, but transaction.timeout.ms has a default value (60000ms, line 525). Please verify and document whether:

  1. The default timeout is ignored when 2PC is enabled (handled in runtime code), or
  2. The timeout applies even with 2PC (which would contradict the "never expire" semantics)

If the default timeout should be ignored with 2PC, consider adding a comment explaining this behavior to prevent future confusion.


🏁 Script executed:

#!/bin/bash
set -e
# Extract lines around idempotence and transactions validation
sed -n '600,650p' clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java

# Extract lines around 2PC validation
sed -n '650,700p' clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java

# Locate userConfiguredTransactions definition
grep -R -n "userConfiguredTransactions" clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java

# Search for any existing check requiring transactional.id when 2PC is enabled
grep -R -n "TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG" clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java

Length of output: 4804


Validate that transaction.two.phase.commit.enable requires transactional.id.

  • Add a config-time check after the idempotence/transactional.id validation to throw a ConfigException when transaction.two.phase.commit.enable=true and no transactional.id is set. Two-phase commit without a transactional ID is invalid.

  • Clarify in comments whether the default transaction.timeout.ms is ignored when 2PC is enabled or still enforced at runtime to match “never expire” semantics.

@visz11
Copy link
Collaborator Author

visz11 commented Oct 14, 2025

/refacto-visz

@refacto-visz
Copy link

refacto-visz bot commented Oct 14, 2025

Refacto is reviewing this PR. Please wait for the review comments to be posted.

@refacto-visz
Copy link

refacto-visz bot commented Oct 14, 2025

Code Review: Two-Phase Commit Transaction Enhancement

👍 Well Done
Configuration Validation Added

Comprehensive validation prevents incompatible transaction timeout with two-phase commit configuration

Backward Compatibility Maintained

Default initTransactions behavior preserved through delegation pattern ensuring API consistency

📁 Selected files for review (9)
  • clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
  • clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
  • clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
  • clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
  • clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
  • clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
  • clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
  • clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
  • clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
🎯 Custom Instructions
✅ Applied Instructions
Organization Guidelines
  • Avoid commented code in actual codebases
  • Hardcoding of variables

Scope: All files

📝 Additional Comments
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java (7)
Constructor Parameter Validation

TransactionManager constructor receives enable2PC parameter but no visible validation logic ensures this flag aligns with transactionalId presence. Business rule violation could occur if 2PC is enabled without transactional configuration. Should validate enable2PC requires transactionalId to be non-null for logical consistency.

Standards:

  • Business-Rule-Input-Validation
  • Logic-Verification-Parameter-Consistency
  • Algorithm-Correctness-Precondition-Check
Transaction State Validation

Logging transaction state information could expose sensitive operational details to unauthorized parties through log files. Transaction recovery operations should be logged at debug level or with sanitized information to prevent information disclosure through log analysis.

Standards:

  • CWE-532
  • OWASP-A09
  • NIST-SSDF-RV.1
Transaction State Validation

Transaction state validation missing when keepPreparedTxn is true could allow invalid state transitions. The flag indicates prepared transactions should be preserved but no validation ensures system is in appropriate state. Invalid state preservation could cause transaction corruption or inconsistent behavior.

Standards:

  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • DbC-Preconditions
  • SRE-Error-Handling
Constructor Parameter Expansion

Constructor parameter list expansion from 5 to 6 parameters increases object creation complexity and memory allocation overhead. Each additional parameter requires stack space and validation processing during instantiation. Consider parameter object pattern for cleaner instantiation performance.

Standards:

  • ISO-IEC-25010-Performance-Efficiency-Resource-Utilization
  • Memory-Allocation-Optimization
  • Constructor-Pattern-Efficiency
Boolean Flag Clarity

Request builder sets two boolean flags with similar but distinct meanings without validation of their logical relationship. While both relate to 2PC behavior, enable2PC indicates client capability while keepPreparedTxn indicates recovery mode. Consider validating that keepPreparedTxn=true requires enable2PC=true for logical consistency.

Standards:

  • Logic-Verification-Flag-Consistency
  • Business-Rule-Parameter-Validation
  • Algorithm-Correctness-State-Validation
Input Parameter Validation

Method accepts boolean parameter without validation of transaction manager state compatibility. Consider adding validation to ensure keepPreparedTxn=true is only allowed when 2PC is enabled to prevent inconsistent transaction behavior.

Standards:

  • CWE-20
  • OWASP-A04
  • NIST-SSDF-PW.1
Method Overload Chain

Multiple method overloads create call chain with 3-4 method invocations before reaching actual implementation. Each method call adds stack frame overhead and parameter passing cost. Direct parameter handling in fewer overloads would reduce call stack depth and improve invocation performance.

Standards:

  • ISO-IEC-25010-Performance-Efficiency-Time-Behavior
  • Method-Call-Optimization
  • Stack-Frame-Efficiency
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java (1)
Configuration Coupling Growth

Adding another configuration parameter increases the coupling between ProducerConfig and TransactionManager construction. Each new feature requires modifying the constructor call site. Consider using a configuration object or factory pattern to reduce direct coupling and improve extensibility for future transaction-related features.

Standards:

  • SOLID-OCP
  • Design-Pattern-Factory
  • Maintainability-Quality-Coupling

ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
apiVersions.update("0", NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
TransactionManager txnManager = new TransactionManager(logContext, "testUnresolvedSeq", 60000, 100, apiVersions);
TransactionManager txnManager = new TransactionManager(logContext, "testUnresolvedSeq", 60000, 100, apiVersions, false);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Constructor Parameter

TransactionManager constructor calls missing the new enable2PC parameter will cause compilation failures. The constructor signature changed to require boolean enable2PC parameter but existing test code not updated. Service initialization will fail preventing transaction functionality.

            TransactionManager txnManager = new TransactionManager(logContext, "testUnresolvedSeq", 60000, 100, apiVersions, false);
Commitable Suggestion
Suggested change
TransactionManager txnManager = new TransactionManager(logContext, "testUnresolvedSeq", 60000, 100, apiVersions, false);
TransactionManager txnManager = new TransactionManager(logContext, "testUnresolvedSeq", 60000, 100, apiVersions, false);
Standards
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • ISO-IEC-25010-Reliability-Fault-Tolerance

// is processed and FindCoordinator can be sent to `leastLoadedNode`.
TransactionManager transactionManager = new TransactionManager(new LogContext(), "testInitProducerIdWithPendingMetadataRequest",
60000, 100L, new ApiVersions());
60000, 100L, new ApiVersions(), false);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Constructor Signature Mismatch

TransactionManager instantiation missing required enable2PC boolean parameter causes compilation failure. Constructor signature change requires all callers to provide the new parameter but test code remains unchanged. Test execution will fail preventing validation of transaction functionality.

                60000, 100L, new ApiVersions(), false);
Commitable Suggestion
Suggested change
60000, 100L, new ApiVersions(), false);
60000, 100L, new ApiVersions(), false);
Standards
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • ISO-IEC-25010-Reliability-Maturity


TransactionManager transactionManager = new TransactionManager(new LogContext(), "testNodeNotReady",
60000, 100L, new ApiVersions());
60000, 100L, new ApiVersions(), false);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Boolean Parameter

TransactionManager constructor call lacks the mandatory enable2PC parameter causing compilation errors. The constructor signature modification requires boolean parameter but existing test instantiation unchanged. Test compilation failure prevents transaction manager validation.

                60000, 100L, new ApiVersions(), false);
Commitable Suggestion
Suggested change
60000, 100L, new ApiVersions(), false);
60000, 100L, new ApiVersions(), false);
Standards
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • ISO-IEC-25010-Reliability-Fault-Tolerance

ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
apiVersions.update("0", NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
TransactionManager txnManager = new TransactionManager(logContext, "testUnresolvedSeq", 60000, 100, apiVersions);
TransactionManager txnManager = new TransactionManager(logContext, "testUnresolvedSeq", 60000, 100, apiVersions, false);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parameter Count Mismatch

TransactionManager constructor invocation missing required enable2PC boolean parameter will cause compilation failure. Constructor signature change mandates additional parameter but test code not updated accordingly. Test build failure prevents transaction reliability validation.

        TransactionManager txnManager = new TransactionManager(logContext, "testUnresolvedSeq", 60000, 100, apiVersions, false);
Commitable Suggestion
Suggested change
TransactionManager txnManager = new TransactionManager(logContext, "testUnresolvedSeq", 60000, 100, apiVersions, false);
TransactionManager txnManager = new TransactionManager(logContext, "testUnresolvedSeq", 60000, 100, apiVersions, false);
Standards
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • ISO-IEC-25010-Reliability-Maturity

public void testTransactionalUnknownProducerHandlingWhenRetentionLimitReached() throws Exception {
final long producerId = 343434L;
TransactionManager transactionManager = new TransactionManager(logContext, "testUnresolvedSeq", 60000, 100, apiVersions);
TransactionManager transactionManager = new TransactionManager(logContext, "testUnresolvedSeq", 60000, 100, apiVersions, false);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Constructor Argument Missing

TransactionManager instantiation lacks the new enable2PC boolean parameter causing compilation errors. Constructor signature modification requires additional parameter but existing test code unchanged. Test compilation failure blocks transaction manager reliability testing.

        TransactionManager transactionManager = new TransactionManager(logContext, "testUnresolvedSeq", 60000, 100, apiVersions, false);
Commitable Suggestion
Suggested change
TransactionManager transactionManager = new TransactionManager(logContext, "testUnresolvedSeq", 60000, 100, apiVersions, false);
TransactionManager transactionManager = new TransactionManager(logContext, "testUnresolvedSeq", 60000, 100, apiVersions, false);
Standards
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • ISO-IEC-25010-Reliability-Fault-Tolerance


@Override
public void initTransactions() {
public void initTransactions(boolean keepPreparedTxn) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Default Implementation

MockProducer implements parameterized initTransactions but removes default no-argument version. Interface provides default delegation to initTransactions(false), but MockProducer override breaks this delegation pattern. This creates API inconsistency where MockProducer behavior differs from KafkaProducer for default initTransactions() calls.

    @Override
    public void initTransactions() {
        initTransactions(false);
    }

    @Override
    public void initTransactions(boolean keepPreparedTxn) {
Commitable Suggestion
Suggested change
public void initTransactions(boolean keepPreparedTxn) {
@Override
public void initTransactions() {
initTransactions(false);
}
@Override
public void initTransactions(boolean keepPreparedTxn) {
Standards
  • Algorithm-Correctness-Interface-Implementation
  • Business-Rule-API-Consistency
  • Logic-Verification-Contract-Compliance

Comment on lines +534 to +538
.define(TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG,
Type.BOOLEAN,
false,
Importance.LOW,
TRANSACTION_TWO_PHASE_COMMIT_ENABLE_DOC)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hardcoded Configuration Values

Configuration value hardcoded as 'false' default violates organization guideline against hardcoding variables. This creates inflexibility in deployment environments where different defaults may be required. Consider using environment-based configuration or making the default configurable through build-time parameters.

Standards
  • Org-Guideline-Hardcoding of variables
  • CWE-547
  • OWASP-A05

Comment on lines 286 to +301
public synchronized TransactionalRequestResult initializeTransactions() {
return initializeTransactions(ProducerIdAndEpoch.NONE);
return initializeTransactions(ProducerIdAndEpoch.NONE, false);
}

synchronized TransactionalRequestResult initializeTransactions(ProducerIdAndEpoch producerIdAndEpoch) {
return initializeTransactions(producerIdAndEpoch, false);
}

public synchronized TransactionalRequestResult initializeTransactions(boolean keepPreparedTxn) {
return initializeTransactions(ProducerIdAndEpoch.NONE, keepPreparedTxn);
}

synchronized TransactionalRequestResult initializeTransactions(
ProducerIdAndEpoch producerIdAndEpoch,
boolean keepPreparedTxn
) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method Overload Chain

Multiple overloaded methods create a complex delegation chain that reduces readability and increases maintenance burden. The four-method overload pattern makes it difficult to understand the primary implementation. Consider consolidating into fewer methods with optional parameters or using a configuration object.

Standards
  • Clean-Code-Functions
  • Maintainability-Quality-Complexity
  • SOLID-SRP

* @throws InterruptException if the thread is interrupted while blocked
*/
public void initTransactions() {
initTransactions(false);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Magic Boolean Parameter

Boolean literal parameter reduces code readability and makes the intent unclear at call sites. The 'false' value requires developers to check method documentation to understand behavior. Consider using an enum or named constants to make the intent explicit and improve self-documenting code.

Standards
  • Clean-Code-Functions
  • Maintainability-Quality-Readability
  • Clean-Code-Meaningful-Names

@visz11
Copy link
Collaborator Author

visz11 commented Oct 14, 2025

/refacto-visz

1 similar comment
@visz11
Copy link
Collaborator Author

visz11 commented Oct 14, 2025

/refacto-visz

@refacto-visz
Copy link

refacto-visz bot commented Oct 14, 2025

Refacto is reviewing this PR. Please wait for the review comments to be posted.

@refacto-visz
Copy link

refacto-visz bot commented Oct 14, 2025

Code Review: Two-Phase Commit Transaction Enhancement

👍 Well Done
Configuration Validation Added

Prevents incompatible transaction timeout with 2PC configuration effectively.

Backward Compatibility Maintained

Default initTransactions behavior preserved through method overloading pattern.

📁 Selected files for review (9)
  • clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
  • clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
  • clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
  • clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
  • clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
  • clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
  • clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
  • clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
  • clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
🎯 Custom Instructions
✅ Applied Instructions
Organization Guidelines
  • Avoid commented code in actual codebases
  • Hardcoding of variables

Scope: All files

📝 Additional Comments
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java (1)
Parameter Validation Gap

Configuration validation only checks if transaction timeout key exists, not if enable2PC requires transactional.id to be set. Two-phase commit functionality requires transactional operations but validation doesn't enforce this dependency. Missing validation could allow invalid configuration combinations.

Standards:

  • Business-Rule-Configuration-Validation
  • Logic-Verification-Dependency-Check
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java (1)
Missing Error Handling

New initTransactions overload lacks specific error handling for 2PC-related failures. keepPreparedTxn parameter introduces new failure modes without corresponding error recovery patterns. Transaction initialization failures with prepared transaction conflicts require specialized error handling for system reliability.

Standards:

  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • SRE-Error-Handling
clients/src/main/java/org/apache/kafka/clients/producer/Producer.java (1)
Boolean Parameter Clarity

Boolean parameter in public API reduces readability at call sites. Callers must remember what true/false means for keepPreparedTxn. This pattern makes code less self-documenting and increases cognitive load for developers using the API.

Standards:

  • Clean-Code-Functions
  • Clean-Code-Names
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java (1)
Transaction State Validation

New transaction initialization method accepts keepPreparedTxn parameter without validation of current transaction state. Malicious or incorrect usage could lead to transaction state corruption or unexpected behavior in two-phase commit scenarios. Consider adding validation to ensure keepPreparedTxn is only used in appropriate transaction states and with proper authorization checks.

Standards:

  • CWE-20
  • OWASP-A04

Comment on lines 257 to +261
final String transactionalId,
final int transactionTimeoutMs,
final long retryBackoffMs,
final ApiVersions apiVersions) {
final ApiVersions apiVersions,
final boolean enable2PC) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Constructor Validation

Constructor accepts enable2PC parameter without validation against transactionTimeoutMs consistency. Configuration validation occurs in ProducerConfig but constructor allows inconsistent state creation. Runtime failures possible when enable2PC=true with non-zero transactionTimeoutMs creating conflicting timeout behaviors.

    public TransactionManager(final LogContext logContext,
                              final String transactionalId,
                              final int transactionTimeoutMs,
                              final long retryBackoffMs,
                              final ApiVersions apiVersions,
                              final boolean enable2PC) {
        if (enable2PC && transactionTimeoutMs > 0) {
            throw new IllegalArgumentException(
                "Cannot use transaction timeout when two-phase commit is enabled. " +
                "Transactions will not expire with two-phase commit enabled.");
        }
Commitable Suggestion
Suggested change
final String transactionalId,
final int transactionTimeoutMs,
final long retryBackoffMs,
final ApiVersions apiVersions) {
final ApiVersions apiVersions,
final boolean enable2PC) {
public TransactionManager(final LogContext logContext,
final String transactionalId,
final int transactionTimeoutMs,
final long retryBackoffMs,
final ApiVersions apiVersions,
final boolean enable2PC) {
if (enable2PC && transactionTimeoutMs > 0) {
throw new IllegalArgumentException(
"Cannot use transaction timeout when two-phase commit is enabled. " +
"Transactions will not expire with two-phase commit enabled.");
}
Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • DbC-Preconditions


@Override
public void initTransactions() {
public void initTransactions(boolean keepPreparedTxn) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Default Implementation

MockProducer overrides initTransactions(boolean) but removes default initTransactions() method implementation. Producer interface defines default initTransactions() calling initTransactions(false), but MockProducer signature change breaks this contract. This creates inconsistent behavior between real and mock implementations.

    @Override
    public void initTransactions() {
        initTransactions(false);
    }

    @Override
    public void initTransactions(boolean keepPreparedTxn) {
Commitable Suggestion
Suggested change
public void initTransactions(boolean keepPreparedTxn) {
@Override
public void initTransactions() {
initTransactions(false);
}
@Override
public void initTransactions(boolean keepPreparedTxn) {
Standards
  • Algorithm-Correctness-Interface-Contract
  • Business-Rule-API-Consistency

Comment on lines 257 to +261
final String transactionalId,
final int transactionTimeoutMs,
final long retryBackoffMs,
final ApiVersions apiVersions) {
final ApiVersions apiVersions,
final boolean enable2PC) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Constructor Parameter Explosion

Constructor now has 6 parameters violating Clean Code parameter limit guidelines. Adding enable2PC parameter increases coupling and makes instantiation complex. This pattern will continue growing as more transaction features are added, reducing maintainability and testability.

Standards
  • Clean-Code-Functions
  • SOLID-SRP

Comment on lines 294 to 312
public synchronized TransactionalRequestResult initializeTransactions(boolean keepPreparedTxn) {
return initializeTransactions(ProducerIdAndEpoch.NONE, keepPreparedTxn);
}

synchronized TransactionalRequestResult initializeTransactions(
ProducerIdAndEpoch producerIdAndEpoch,
boolean keepPreparedTxn
) {
maybeFailWithError();

boolean isEpochBump = producerIdAndEpoch != ProducerIdAndEpoch.NONE;
return handleCachedTransactionRequestResult(() -> {
// If this is an epoch bump, we will transition the state as part of handling the EndTxnRequest
if (!isEpochBump) {
transitionTo(State.INITIALIZING);
log.info("Invoking InitProducerId for the first time in order to acquire a producer ID");
if (keepPreparedTxn) {
log.info("Invoking InitProducerId with keepPreparedTxn set to true for 2PC transactions");
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Transaction State Inconsistency

keepPreparedTxn=true allows preserving prepared transactions but lacks validation of transaction manager state compatibility. Prepared transaction preservation without proper state validation creates potential for transaction state corruption. Service reliability compromised when transaction boundaries become inconsistent.

Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • DbC-Invariants

Comment on lines +534 to +538
.define(TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG,
Type.BOOLEAN,
false,
Importance.LOW,
TRANSACTION_TWO_PHASE_COMMIT_ENABLE_DOC)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hardcoded Configuration Values

Configuration parameter uses hardcoded default value 'false' instead of externalized configuration. This violates organization guideline against hardcoding variables and creates maintenance burden when default behavior needs modification. Consider using environment-based or profile-based configuration defaults to improve flexibility and maintainability.

Standards
  • CWE-547
  • OWASP-A05

Comment on lines 286 to +295
public synchronized TransactionalRequestResult initializeTransactions() {
return initializeTransactions(ProducerIdAndEpoch.NONE);
return initializeTransactions(ProducerIdAndEpoch.NONE, false);
}

synchronized TransactionalRequestResult initializeTransactions(ProducerIdAndEpoch producerIdAndEpoch) {
return initializeTransactions(producerIdAndEpoch, false);
}

public synchronized TransactionalRequestResult initializeTransactions(boolean keepPreparedTxn) {
return initializeTransactions(ProducerIdAndEpoch.NONE, keepPreparedTxn);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method Overloading Chain

Four overloaded methods create a complex delegation chain with boolean parameters. This telescoping constructor anti-pattern makes the API confusing and error-prone. The multiple overloads with different parameter combinations reduce code clarity and increase maintenance burden.

Standards
  • Clean-Code-Functions
  • Design-Pattern-Builder

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

clients producer size:L This PR changes 100-499 lines, ignoring generated files

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants