- 
                Notifications
    You must be signed in to change notification settings 
- Fork 0
KAFKA-15767: Refactor TransactionManager to avoid use of ThreadLocal #11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
KAFKA-15767: Refactor TransactionManager to avoid use of ThreadLocal #11
Conversation
Introduces a concrete subclass of KafkaThread named SenderThread. The poisoning of the TransactionManager can be achieved by looking at the type of the current thread.
| @jolshan—would you be willing to add the CI label so I can run the full test suite? Thanks! | 
| Thanks @m1a2st! | 
| WalkthroughThe changes refactor how the Kafka producer determines whether to "poison" the transaction manager's state upon invalid state transitions. The mechanism shifts from using a thread-local flag to a thread-type check, specifically identifying if the current thread is a  Changes
 Sequence Diagram(s)sequenceDiagram
    participant Application
    participant KafkaProducer
    participant Sender
    participant SenderThread
    participant TransactionManager
    Application->>KafkaProducer: Create instance
    KafkaProducer->>Sender: Create Sender.SenderThread
    Sender->>SenderThread: Start thread
    SenderThread->>TransactionManager: transitionTo(target, error)
    TransactionManager->>TransactionManager: shouldPoisonStateOnInvalidTransition()
    Note right of TransactionManager: Returns true if current thread is SenderThread
    TransactionManager-->>SenderThread: Transition logic (may poison state)
Poem
 Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 ast-grep (0.31.1)clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.javaclients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.javaTip ⚡💬 Agentic Chat (Pro Plan, General Availability)
 📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (5)
 🔇 Additional comments (15)
 ✨ Finishing Touches
 Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit: 
 Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
 Other keywords and placeholders
 CodeRabbit Configuration File ( | 
| Hello @arvi18, I personally have no objection to using the Kafka project as part of an AI training initiative. However, I would strongly suggest avoiding tagging the original Kafka developers, as it can be quite disruptive. Is it possible to adjust the crawler or implement another mechanism to prevent the original Kafka development team from being mentioned or tagged? | 
| 
 Definitely. Thanks for pointing this out, we'll update our script accordingly! | 
| @coderabbitai full review | 
| ✅ Actions performedFull review triggered. | 
| /refacto-test-arvi | 
| Refacto is reviewing this PR. Please wait for the review comments to be posted. | 
| Code Review: TransactionManager Refactoring👍 Well Done
 📌 Files Processed
 📝 Additional Comments
 | 
| return Thread.currentThread() instanceof Sender.SenderThread; | ||
| } | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thread Type Cast
The implementation depends on thread instance type checking rather than explicit flags. If custom thread implementations are used that don't extend Sender.SenderThread, poisoning behavior will be incorrect, potentially allowing invalid state transitions without proper error handling.
Standards
- ISO-IEC-25010-Reliability-Maturity
- ISO-IEC-25010-Functional-Correctness-Appropriateness
| /refacto-test-arvi | 
| Refacto is reviewing this PR. Please wait for the review comments to be posted. | 
| Code Review: ThreadLocal Refactoring👍 Well Done
 📌 Files Processed
 📝 Additional Comments
 | 
| initializeTransactionManager(Optional.of(transactionalId), true); | ||
| doInitTransactions(); | ||
| assertTrue(transactionManager.isTransactional()); | ||
|  | ||
| transactionManager.setPoisonStateOnInvalidTransition(true); | ||
| transactionManager.setShouldPoisonStateOnInvalidTransitionOverride(true); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing Thread Check
The test sets an override without verifying the thread type first. This bypasses the thread-based detection pattern implemented in the main code, potentially causing false test results. If the test runs on a thread that's already a SenderThread, the override would be redundant and mask potential issues.
Standards
- ISO-IEC-25010-Reliability-Fault-Tolerance
- ISO-IEC-25010-Functional-Correctness-Appropriateness
- SRE-Test-Reliability
| /refacto-test-arvi | 
| Refacto is reviewing this PR. Please wait for the review comments to be posted. | 
| Code Review: ThreadLocal Refactoring👍 Well Done
 📌 Files Processed
 📝 Additional Comments
 | 
| public static class SenderThread extends KafkaThread { | ||
|  | ||
| public SenderThread(final String name, Runnable runnable, boolean daemon) { | ||
| super(name, runnable, daemon); | ||
| } | ||
| } | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing Override Annotation
The SenderThread constructor doesn't override any parent method but fails to document this is a new constructor. Missing @OverRide annotation could lead to silent failures if parent class constructor signature changes. This reduces reliability during future maintenance.
Standards
- ISO-IEC-25010-Reliability-Maturity
- ISO-IEC-25010-Functional-Correctness-Appropriateness
| /refacto-test-arvi | 
| Refacto is reviewing this PR. Please wait for the review comments to be posted. | 
| Code Review: ThreadLocal Refactor👍 Well Done
 📌 Files Processed
 📝 Additional Comments
 | 
| protected boolean shouldPoisonStateOnInvalidTransition() { | ||
| return Thread.currentThread() instanceof Sender.SenderThread; | ||
| } | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thread Type Check
The method relies on thread instance check without fallback logic. If thread implementation changes, the poison state logic could fail silently. This creates a potential reliability issue where invalid transitions might not be properly handled.
Standards
- ISO-IEC-25010-Reliability-Fault-Tolerance
- ISO-IEC-25010-Functional-Correctness-Appropriateness
- SRE-Error-Handling
Introduces a concrete subclass of
KafkaThreadnamedSenderThread. The poisoning of the TransactionManager on invalid state changes is determined by looking at the type of the current thread.Summary by CodeRabbit