-
Notifications
You must be signed in to change notification settings - Fork 0
KAFKA-17871: avoid blocking the herder thread when producer flushing hangs #21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…9521) This patch addresses issue apache#19516 and corrects a typo in `ApiKeyVersionsProvider`: when `toVersion` exceeds `latestVersion`, the `IllegalArgumentException` message was erroneously formatted with `fromVersion`. The format argument has been updated to use `toVersion` so that the error message reports the correct value. Reviewers: Ken Huang <[email protected]>, PoAn Yang <[email protected]>, Jhen-Yung Hsu <[email protected]>, Chia-Ping Tsai <[email protected]>
…pache#19302) Move the static fields/methods Reviewers: Luke Chen <[email protected]>
The check for `scheduler.pendingTaskSize()` may fail if the thread pool is too slow to consume the runnable objects Reviewers: Ken Huang <[email protected]>, PoAn Yang <[email protected]>, Chia-Ping Tsai <[email protected]>
…#17099) Two sets of tests are added: 1. KafkaProducerTest - when send success, both record.headers() and onAcknowledgement headers are read only - when send failure, record.headers() is writable as before and onAcknowledgement headers is read only 2. ProducerInterceptorsTest - make both old and new onAcknowledgement method are called successfully Reviewers: Lianet Magrans <[email protected]>, Omnia Ibrahim <[email protected]>, Matthias J. Sax <[email protected]>, Andrew Schofield <[email protected]>, Chia-Ping Tsai <[email protected]>
about https://github.com/apache/kafka/pull/19387/files#r2052025917 Reviewers: PoAn Yang <[email protected]>, Chia-Ping Tsai <[email protected]>, TengYao Chi <[email protected]>
…pache#19437) This PR adds the support for remote storage fetch for share groups. There is a limitation in remote storage fetch for consumer groups that we can only perform remote fetch for a single topic partition in a fetch request. Since, the logic of share fetch requests is largely based on how consumer groups work, we are following similar logic in implementing remote storage fetch. However, this problem should be addressed as part of KAFKA-19133 which should help us perform fetch for multiple remote fetch topic partition in a single share fetch request. Reviewers: Jun Rao <[email protected]>
As the title. Ticket: https://issues.apache.org/jira/browse/KAFKA-19179 Reviewers: PoAn Yang <[email protected]>, Jhen-Yung Hsu <[email protected]>, TengYao Chi <[email protected]>, Nick Guo <[email protected]>, Ken Huang <[email protected]>, Chia-Ping Tsai <[email protected]>
Note: this is a apache#18018 offshoot. See this comment made by @Goooler: apache#18018 (comment)  Reviewers: Apoorv Mittal <[email protected]>, David Arthur <[email protected]>, Goooler <[email protected]>
The release script was pushing the RC tag off of a temporary branch that was never merged back into the release branch. This meant that our RC and release tags were detached from the rest of the repository. This patch changes the release script to merge the RC tag back into the release branch and pushes both the tag and the branch. Reviewers: Luke Chen <[email protected]>
This PR removes the unstable API flag for the KIP-932 RPCs. The 4 RPCs which were exposed for the early access release in AK 4.0 are stabilised at v1. This is because the RPCs have evolved over time and AK 4.0 clients are not compatible with AK 4.1 brokers. By stabilising at v1, the API version checks prevent incompatible communication and server-side exceptions when trying to parse the requests from the older clients. Reviewers: Apoorv Mittal <[email protected]>
…19500) Currently the share session cache is desgined like the fetch session cache. If the cache is full and a new share session is trying to get get initialized, then the sessions which haven't been touched for more than 2minutes are evicted. This wouldn't be right for share sessions as the members also hold locks on the acquired records, and session eviction would mean theose locks will need to be dropped and the corresponding records re-delivered. This PR removes the time based eviction logic for share sessions. Refer: [KAFKA-19159](https://issues.apache.org/jira/browse/KAFKA-19159) Reviewers: Apoorv Mittal <[email protected]>, Chia-Ping Tsai <[email protected]>
Small improvements to share consumer javadoc. Reviewers: Apoorv Mittal <[email protected]>
Updated the Kafka Streams documentation to include metrics for tasks, process nodes, and threads that were missing. I was unable to find metrics such as stream-state-metrics, client-metrics, state-store-metrics, and record-cache-metrics in the codebase, so they are not included in this update. Reviewers: Bill Bejeck <[email protected]>
…ache#19416) This change implements upgrading the kraft version from 0 to 1 in existing clusters. Previously, clusters were formatted with either version 0 or version 1, and could not be moved between them. The kraft version for the cluster metadata partition is recorded using the KRaftVersion control record. If there is no KRaftVersion control record the default kraft version is 0. The kraft version is upgraded using the UpdateFeatures RPC. These RPCs are handled by the QuorumController and FeatureControlManager. This change adds special handling in the FeatureControlManager so that upgrades to the kraft.version are directed to RaftClient#upgradeKRaftVersion. To allow the FeatureControlManager to call RaftClient#upgradeKRaftVersion is a non-blocking fashion, the kraft version upgrade uses optimistic locking. The call to RaftClient#upgradeKRaftVersion does validations of the version change. If the validations succeeds, it generates the necessary control records and adds them to the BatchAccumulator. Before the kraft version can be upgraded to version 1, all of the brokers and controllers in the cluster need to support kraft version 1. The check that all brokers support kraft version 1 is done by the FeatureControlManager. The check that all of the controllers support kraft version is done by KafkaRaftClient and LeaderState. When the kraft version is 0, the kraft leader starts by assuming that all voters do not support kraft version 1. The leader discovers which voters support kraft version 1 through the UpdateRaftVoter RPC. The KRaft leader handles UpdateRaftVoter RPCs by storing the updated information in-memory until the kraft version is upgraded to version 1. This state is stored in LeaderState and contains the latest directory id, endpoints and supported kraft version for each voter. Only when the KRaft leader has received an UpdateRaftVoter RPC from all of the voters will it allow the upgrade from kraft.version 0 to 1. Reviewers: Alyssa Huang <[email protected]>, Colin P. McCabe <[email protected]>
This patch extends the OffsetCommit API to support topic ids. From version 10 of the API, topic ids must be used. Originally, we wanted to support both using topic ids and topic names from version 10 but it turns out that it makes everything more complicated. Hence we propose to only support topic ids from version 10. Clients which only support using topic names can either lookup the topic ids using the Metadata API or stay on using an earlier version. The patch only contains the server side changes and it keeps the version 10 as unstable for now. We will mark the version as stable when the client side changes are merged in. Reviewers: Lianet Magrans <[email protected]>, PoAn Yang <[email protected]>
…a result of change in assignor algorithm (apache#19541) The system test `ShareConsumerTest.test_share_multiple_partitions` started failing because of the recent change in the SimpleAssignor algorithm. The tests assumed that if a share group is subscribed to a topic, then every share consumers part of the group will be assigned all partitions of the topic. But that does not happen now, and partitions are split between the share consumers in certain cases, in which some partitions are only assigned to a subset of share consumers. This change removes that assumption Reviewers: PoAn Yang <[email protected]>, Andrew Schofield <[email protected]>
…ionCache (apache#19505) This PR removes the group.share.max.groups config. This config was used to calculate the maximum size of share session cache. But with the new config group.share.max.share.sessions in place with exactly this purpose, the ShareSessionCache initialization has also been passed the new config. Refer: [KAFKA-19156](https://issues.apache.org/jira/browse/KAFKA-19156) Reviewers: Apoorv Mittal <[email protected]>, Andrew Schofield <[email protected]>, Chia-Ping Tsai <[email protected]>
…ache#19443) * There could be scenarios where share partition records in `__share_group_state` internal topic are not updated for a while implying these partitions are basically cold. * In this situation, the presence of these holds back the pruner from keeping the topic clean and of manageable size. * To remedy the situation, we have added a periodic `setupSnapshotColdPartitions` in `ShareCoordinatorService` which does a writeAll operation on the associated shards in the coordinator and forces snapshot creation for any cold partitions. In this way the pruner can continue. This job has been added as a timer task. * A new internal config `share.coordinator.cold.partition.snapshot.interval.ms` has been introduced to set the period of the job. * Any failures are logged and ignored. * New tests have been added to verify the feature. Reviewers: PoAn Yang <[email protected]>, Andrew Schofield <[email protected]>
Improves a variable name and handling of an Optional. Reviewers: Bill Bejeck <[email protected]>, Chia-Ping Tsai <[email protected]>, PoAn Yang <[email protected]>
…pache#19440) Introduces a concrete subclass of `KafkaThread` named `SenderThread`. The poisoning of the TransactionManager on invalid state changes is determined by looking at the type of the current thread. Reviewers: Chia-Ping Tsai <[email protected]>
…pache#19457) - Construct `AsyncKafkaConsumer` constructor and verify that the `RequestManagers.supplier()` contains Streams-specific data structures. - Verify that `RequestManagers` constructs the Streams request managers correctly - Test `StreamsGroupHeartbeatManager#resetPollTimer()` - Test `StreamsOnTasksRevokedCallbackCompletedEvent`, `StreamsOnTasksAssignedCallbackCompletedEvent`, and `StreamsOnAllTasksLostCallbackCompletedEvent` in `ApplicationEventProcessor` - Test `DefaultStreamsRebalanceListener` - Test `StreamThread`. - Test `handleStreamsRebalanceData`. - Test `StreamsRebalanceData`. Reviewers: Lucas Brutschy <[email protected]>, Bill Bejeck <[email protected]> Signed-off-by: PoAn Yang <[email protected]>
…he#19547) Change the log messages which used to warn that KIP-932 was an Early Access feature to say that it is now a Preview feature. This will make the broker logs far less noisy when share groups are enabled. Reviewers: Apoorv Mittal <[email protected]>
The generated response data classes take Readable as input to parse the Response. However, the associated response objects take ByteBuffer as input and thus convert them to Readable using `new ByteBufferAccessor` call. This PR changes the parse method of all the response classes to take the Readable interface instead so that no such conversion is needed. To support parsing the ApiVersionsResponse twice for different version this change adds the "slice" method to the Readable interface. Reviewers: José Armando García Sancio <[email protected]>, Truc Nguyen <[[email protected]](mailto:[email protected])>, Aadithya Chandra <[[email protected]](mailto:[email protected])>
…#19549) The heartbeat logic for share groups is tricky when the set of topic-partitions eligible for assignment changes. We have observed epoch mismatches when brokers are restarted, which should not be possible. Improving the logging so we can see the previous member epoch and tally this with the logged state. Reviewers: Apoorv Mittal <[email protected]>, Sushant Mahajan <[email protected]>
…19536) This PR marks the records as non-nullable for ShareFetch. This PR is as per the changes for Fetch: apache#18726 and some work for ShareFetch was done here: apache#19167. I tested with marking `records` as non-nullable in ShareFetch, which required additional handling. The same has been fixed in current PR. Reviewers: Andrew Schofield <[email protected]>, Chia-Ping Tsai <[email protected]>, TengYao Chi <[email protected]>, PoAn Yang <[email protected]>
…tProducerId (KIP-939) (apache#19429) This is part of the client side changes required to enable 2PC for KIP-939 **Producer Config:** transaction.two.phase.commit.enable The default would be ‘false’. If set to ‘true’, the broker is informed that the client is participating in two phase commit protocol and transactions that this client starts never expire. **Overloaded InitProducerId method** If the value is 'true' then the corresponding field is set in the InitProducerIdRequest Reviewers: Justine Olshan <[email protected]>, Artem Livshits <[email protected]>
…ics) (apache#17988) Reviewers: Greg Harris <[email protected]>
This patch does a few code changes: * It cleans up the GroupCoordinatorService; * It moves the helper methods to validate request to Utils; * It moves the helper methods to create the assignment for the ConsumerGroupHeartbeatResponse and the ShareGroupHeartbeatResponse from the GroupMetadataManager to the respective classes. Reviewers: Chia-Ping Tsai <[email protected]>, Jeff Kim <[email protected]>
…rvers (apache#19545) Old bootstrap.metadata files cause problems with server that include KAFKA-18601. When the server tries to read the bootstrap.checkpoint file, it will fail if the metadata.version is older than 3.3-IV3 (feature level 7). This causes problems when these clusters are upgraded. This PR makes it possible to represent older MVs in BootstrapMetadata objects without causing an exception. An exception is thrown only if we attempt to access the BootstrapMetadata. This ensures that only the code path in which we start with an empty metadata log checks that the metadata version is 7 or newer. Reviewers: José Armando García Sancio <[email protected]>, Ismael Juma <[email protected]>, PoAn Yang <[email protected]>, Liu Zeyu <[email protected]>, Alyssa Huang <[email protected]>
Replace names like a, b, c, ... with meaningful names in AsyncKafkaConsumerTest. Follow-up: apache#19457 (comment) Signed-off-by: PoAn Yang <[email protected]> Reviewers: Bill Bejeck <[email protected]>, Ken Huang <[email protected]>
…pache#19450) Kafka Streams calls `prepareCommit()` in `Taskmanager#closeTaskDirty()`. However, the dirty task must not get committed and therefore, prepare-commit tasks such as getting offsets should not be needed as well. The only thing needed before closing a task dirty is flushing. Therefore, separating `flush` and `prepareCommit` could be a good fix. Reviewers: Bill Bejeck <[email protected]>, Matthias J. Sax <[email protected]>
…ache#19548) If a streams, share or consumer group is described, all group IDs sent to all shards of the group coordinator. This change fixes it. It tested in the unit tests, since it's somewhat inconvenient to test the passed read operation lambda. Reviewers: David Jacot <[email protected]>, Andrew Schofield <[email protected]>
apache#19552) This PR just resolves an NPE when a topic assigned in a share group is deleted. The NPE is caused by code which uses the current metadata image to convert from a topic ID to the topic name. For a deleted topic, there is no longer any entry in the image. A future PR will properly handle the topic deletion. Reviewers: Apoorv Mittal <[email protected]>, PoAn Yang <[email protected]>
If the streams rebalance protocol is enabled in StreamsUncaughtExceptionHandlerIntegrationTest, the streams application does not shut down correctly upon error. There are two causes for this. First, sometimes, the SHUTDOWN_APPLICATION code only sent with the leave heartbeat, but that is not handled broker side. Second, the SHUTDOWN_APPLICATION code wasn't properly handled client-side at all. Reviewers: Bruno Cadonna <[email protected]>, Bill Bejeck <[email protected]>, PoAn Yang <[email protected]>
…upMetadataValue (apache#19504) * Add MetadataHash field to ConsumerGroupMetadataValue, ShareGroupMetadataValue, and StreamGroupMetadataValue. * Add metadataHash field to GroupCoordinatorRecordHelpers#newConsumerGroupEpochRecord, GroupCoordinatorRecordHelpers#newShareGroupEpochRecord, and StreamsCoordinatorRecordHelpers#newStreamsGroupEpochRecord. * Add deprecated message to ConsumerGroupPartitionMetadataKey and ConsumerGroupPartitionMetadataValue. * ShareGroupPartitionMetadataKey / ShareGroupPartitionMetadataValue / StreamGroupPartitionMetadataKey / StreamGroupPartitionMetadataValue will be removed in next PR. Reviewers: Lucas Brutschy <[email protected]>, David Jacot <[email protected]> --------- Signed-off-by: PoAn Yang <[email protected]>
…hangs The call to `backingStore.get()` (called by connector task threads through `OffsetStorageReaderImpl.offsets()`) can block for long time waiting for data flush to complete (`KafkaProducer.flush()`). This change moves that call outside the synchronized clause that holds `offsetReadFutures`, so that if `backingStore.get()` hangs then it does not keep `offsetReadFutures` locked. The access to `closed` flag (`closed.get()`) is kept inside the synchronize clause to avoid race condition with `close()`. This is important because `OffsetStorageReaderImpl.close()` needs to lock `offsetReadFutures` as well in order to cancel the futures. Since the herder thread calls `OffsetStorageReaderImpl.close()` when attempting to stops a task, before this change this was resulting in the herder thread hanging indefinetely waiting for `backingStore.get()` to complete.
|
Important Review skippedMore than 25% of the files skipped due to max files limit. The review is being skipped to prevent a low-quality review. 193 files out of 300 files are above the max files limit of 100. Please upgrade to Pro plan to get higher limits. You can disable this status message by setting the ✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
🤖 Refacto.AI: PR Summary & Reviewer Guide🎯 1. The Goal (Why was this PR raised?)This PR aims to implement KAFKA-17871, which focuses on avoiding blocking the herder thread when producer flushing hangs. The changes involve significant modifications to the Kafka Raft implementation, transaction management, and KRaft version upgrade capabilities. 🗺️ 2. The Changes (What was implemented?)
🤔 3. Key Areas for Human ReviewHere are the most important areas to focus your review on, including specific testing instructions. Area of Concern: KRaft Version Upgrade Handling
Area of Concern: Transaction Management Thread Safety
Area of Concern: UpdateVoterHandler Logic
Area of Concern: Feature Control for KRaft Version
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Summary of Changes
Hello @arvi18, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
This pull request introduces several significant enhancements across the Kafka ecosystem. My primary focus was to address a critical reliability issue in the Connect framework, preventing the herder thread from blocking when producer flushing operations hang. Beyond this, I've advanced the maturity of the 'Share Group' feature, moving it from an experimental 'early access' to a more stable 'preview' state. A major new capability is the introduction of two-phase commit (2PC) support for transactional producers, enabling external coordination of transactions. I've also improved producer interceptors by providing access to record headers in acknowledgment callbacks. Furthermore, this PR lays the groundwork for KRaft versioning and upgrade mechanisms, and significantly enhances the Connect framework's observability by adding plugin versioning and metrics. Finally, I've refined API parsing to use a more flexible Readable interface and integrated topic IDs into the consumer group protocol.
Highlights
- Connect Framework Reliability: I've implemented a crucial fix to prevent the Connect framework's herder thread from blocking indefinitely when a producer's flushing operation hangs. This significantly enhances the overall stability and resilience of the Connect cluster.
- Share Group Feature Maturity: The 'Share Group' feature (KIP-932) has matured from an 'early access' experimental state to a more stable 'preview feature'. This involved updating documentation, removing
unstable.api.versions.enableflags from tests, and refining related admin client APIs. - Transactional Producer Enhancements: I've introduced robust support for two-phase commit (2PC) in transactional producers. This allows external transaction managers to coordinate the finalization of transactions, ensuring greater data consistency and flexibility in distributed systems.
- Producer Interceptor Improvements: Producer interceptors now receive record headers in their
onAcknowledgementcallbacks. This provides more context for custom interceptor logic, allowing for richer data processing and monitoring capabilities. - KRaft Versioning and Upgrade: I've added the foundational mechanisms for updating and tracking KRaft protocol versions. This includes handling voter set updates during version transitions, ensuring a smoother and more controlled evolution of the KRaft cluster.
- Connect Plugin Versioning and Metrics: The Connect framework now tracks and exposes detailed version information and metrics for connectors, tasks, and their associated plugins (converters, transforms, predicates). This provides better visibility into the operational state and composition of Connect deployments.
- API Refinements: I've standardized the API request and response parsing across various client and server components by transitioning from
ByteBuffertoReadable. This improves the flexibility and consistency of data handling within the Kafka ecosystem. - Consumer Group Protocol (KIP-848) Updates: I've integrated topic IDs into
OffsetCommitRequestandOffsetCommitResponseas part of the ongoing work for the new consumer group protocol (KIP-848). This is a step towards more efficient and robust consumer group management.
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.
| Feature | Command | Description |
|---|---|---|
| Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
| Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
| Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
| Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments or fill out our survey to provide feedback.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This is a substantial pull request that introduces several significant changes and refactorings across the codebase. The primary goal, to prevent the Connect herder thread from blocking, appears to be correctly addressed in OffsetStorageReaderImpl by moving a potentially long-running operation out of a synchronized block.
Other major changes include the stabilization of Share Group APIs (KIP-932), the addition of topic ID support to the OffsetCommit API (KIP-848), and a new feature to allow upgrading the KRaft version. The codebase also benefits from several cleanups, such as a large-scale refactoring to use a Readable interface for response parsing and centralizing LogManager logic. The changes are well-implemented and include corresponding tests.
I have a couple of minor suggestions to improve code consistency in AsyncKafkaConsumer.java by applying a refactoring pattern uniformly across similar methods.
| final CompletableFuture<Void> future) { | ||
| final Optional<KafkaException> error; | ||
| final Optional<Exception> exceptionFromCallback = streamsGroupRebalanceCallbacks().onTasksAssigned(assignment); | ||
| final Optional<Exception> exceptionFromCallback = streamsRebalanceListener().onTasksAssigned(assignment); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For consistency with invokeOnTasksRevokedCallback, this method could also be refactored to use Optional.map to simplify the logic for handling the optional exception. This would make the code more concise and uniform across similar methods in this class.
For example:
final Optional<Exception> exceptionFromCallback = streamsRebalanceListener().onTasksAssigned(assignment);
final Optional<KafkaException> error = exceptionFromCallback.map(e -> ConsumerUtils.maybeWrapAsKafkaException(e, "Task assignment callback throws an error"));
return new StreamsOnTasksAssignedCallbackCompletedEvent(future, error);| private StreamsOnAllTasksLostCallbackCompletedEvent invokeOnAllTasksLostCallback(final CompletableFuture<Void> future) { | ||
| final Optional<KafkaException> error; | ||
| final Optional<Exception> exceptionFromCallback = streamsGroupRebalanceCallbacks().onAllTasksLost(); | ||
| final Optional<Exception> exceptionFromCallback = streamsRebalanceListener().onAllTasksLost(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to invokeOnTasksRevokedCallback, this method can be simplified by using Optional.map to handle the optional exception. This would improve code consistency within the class.
For example:
final Optional<Exception> exceptionFromCallback = streamsRebalanceListener().onAllTasksLost();
final Optional<KafkaException> error = exceptionFromCallback.map(e -> ConsumerUtils.maybeWrapAsKafkaException(e, "All tasks lost callback throws an error"));
return new StreamsOnAllTasksLostCallbackCompletedEvent(future, error);|
/refacto-test |
|
Refacto is reviewing this PR. Please wait for the review comments to be posted. |
Code Review: Thread Safety and KRaft Version Upgrade👍 Well Done
📌 Files Processed
📝 Additional Comments
|
| this.pendingPartitionsInTransaction = new HashSet<>(); | ||
| this.partitionsInTransaction = new HashSet<>(); | ||
| this.shouldPoisonStateOnInvalidTransition = ThreadLocal.withInitial(() -> false); | ||
| this.pendingRequests = new PriorityQueue<>(10, Comparator.comparingInt(o -> o.priority().priority)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ThreadLocal Resource Leak
ThreadLocal creates memory leaks in thread pools where threads are reused. The current implementation doesn't properly clean up ThreadLocal values, potentially causing memory growth under load and resource exhaustion in production environments.
| this.pendingRequests = new PriorityQueue<>(10, Comparator.comparingInt(o -> o.priority().priority)); | |
| protected boolean shouldPoisonStateOnInvalidTransition() { | |
| return Thread.currentThread() instanceof Sender.SenderThread; | |
| } |
Standards
- CWE-567
- OWASP-A05
- ISO-IEC-25010-Performance-Resource-Utilization
| protected boolean shouldPoisonStateOnInvalidTransition() { | ||
| return Thread.currentThread() instanceof Sender.SenderThread; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thread Safety Issue
Thread identification using instanceof is fragile and can lead to false negatives. If thread implementation changes, the condition fails silently causing reliability issues.
| protected boolean shouldPoisonStateOnInvalidTransition() { | |
| return Thread.currentThread() instanceof Sender.SenderThread; | |
| } | |
| protected boolean shouldPoisonStateOnInvalidTransition() { | |
| Thread currentThread = Thread.currentThread(); | |
| return currentThread instanceof Sender.SenderThread || | |
| (currentThread.getName() != null && currentThread.getName().contains("kafka-sender")); | |
| } |
Standards
- ISO-IEC-25010-Reliability-Fault-Tolerance
- ISO-IEC-25010-Functional-Correctness-Completeness
- SRE-Error-Handling
| if (localVoterNode.voterKey().directoryId().isEmpty()) { | ||
| throw new IllegalArgumentException( | ||
| String.format("Unknown local replica directory id: %s", localVoterNode) | ||
| ); | ||
| } else if (!voterSetAtEpochStart.isVoter(localVoterNode.voterKey())) { | ||
| throw new IllegalArgumentException( | ||
| String.format( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing Directory Validation
Directory ID validation only checks if empty but not for malformed values. Could allow invalid directory IDs to pass validation. Potential for inconsistent voter state.
| if (localVoterNode.voterKey().directoryId().isEmpty()) { | |
| throw new IllegalArgumentException( | |
| String.format("Unknown local replica directory id: %s", localVoterNode) | |
| ); | |
| } else if (!voterSetAtEpochStart.isVoter(localVoterNode.voterKey())) { | |
| throw new IllegalArgumentException( | |
| String.format( | |
| if (localVoterNode.voterKey().directoryId().isEmpty()) { | |
| throw new IllegalArgumentException( | |
| String.format( | |
| "Unknown local replica directory id: %s", localVoterNode | |
| ) | |
| ); | |
| } else if (!UUID.nameUUIDFromBytes(localVoterNode.voterKey().directoryId().get()).equals(localVoterNode.voterKey().directoryId().get())) { | |
| throw new IllegalArgumentException( | |
| String.format( | |
| "Malformed local replica directory id: %s", localVoterNode.voterKey().directoryId().get() | |
| ) | |
| ); | |
| } else if (!voterSetAtEpochStart.isVoter(localVoterNode.voterKey())) { |
Standards
- CWE-20
- OWASP-A03
| private final AtomicReference<KRaftVersionUpgrade> kraftVersionUpgradeState = new AtomicReference<>( | ||
| KRaftVersionUpgrade.empty() | ||
| ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential Race Condition
AtomicReference is initialized but lacks proper memory visibility guarantees across threads. Without volatile field or synchronized access patterns, other threads might see stale values.
| private final AtomicReference<KRaftVersionUpgrade> kraftVersionUpgradeState = new AtomicReference<>( | |
| KRaftVersionUpgrade.empty() | |
| ); | |
| private final volatile AtomicReference<KRaftVersionUpgrade> kraftVersionUpgradeState = new AtomicReference<>( | |
| KRaftVersionUpgrade.empty() | |
| ); |
Standards
- ISO-IEC-25010-Reliability-Fault-Tolerance
- ISO-IEC-25010-Functional-Correctness-Appropriateness
- SRE-Thread-Safety
| AliasedPluginInfo predicateInfo = predicatePlugin != null ? new AliasedPluginInfo( | ||
| predicateAlias, | ||
| predicatePlugin.get().getClass().getName(), predicateVersion | ||
| ) : null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing Null Check
Potential NullPointerException if predicatePlugin is non-null but predicateAlias is null. The AliasedPluginInfo constructor requires non-null alias but no validation occurs before invocation.
| AliasedPluginInfo predicateInfo = predicatePlugin != null ? new AliasedPluginInfo( | |
| predicateAlias, | |
| predicatePlugin.get().getClass().getName(), predicateVersion | |
| ) : null; | |
| AliasedPluginInfo predicateInfo = (predicatePlugin != null && predicateAlias != null) ? new AliasedPluginInfo( | |
| predicateAlias, | |
| predicatePlugin.get().getClass().getName(), predicateVersion | |
| ) : null; |
Standards
- ISO-IEC-25010-Reliability-Fault-Tolerance
- ISO-IEC-25010-Functional-Correctness-Appropriateness
- DbC-Precondition-Validation
| var successful = leaderState.compareAndSetVolatileVoters( | ||
| inMemoryVoters.get(), | ||
| new KRaftVersionUpgrade.Voters(newVoters) | ||
| ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unchecked Exception Propagation
Unchecked exceptions from compareAndSetVolatileVoters aren't handled. If operation fails with runtime exception, it could leave the system in inconsistent state and propagate failure.
| var successful = leaderState.compareAndSetVolatileVoters( | |
| inMemoryVoters.get(), | |
| new KRaftVersionUpgrade.Voters(newVoters) | |
| ); | |
| boolean successful; | |
| try { | |
| successful = leaderState.compareAndSetVolatileVoters( | |
| inMemoryVoters.get(), | |
| new KRaftVersionUpgrade.Voters(newVoters) | |
| ); | |
| } catch (RuntimeException e) { | |
| log.error("Failed to update in-memory voters", e); | |
| return CompletableFuture.completedFuture( | |
| RaftUtil.updateVoterResponse( | |
| Errors.UNKNOWN_SERVER_ERROR, | |
| requestListenerName, | |
| new LeaderAndEpoch( | |
| localId, | |
| leaderState.epoch() | |
| ), | |
| leaderState.leaderEndpoints() | |
| ) | |
| ); | |
| } |
Standards
- ISO-IEC-25010-Reliability-Fault-Tolerance
- ISO-IEC-25010-Functional-Correctness-Appropriateness
- SRE-Error-Handling
| ) | ||
| ); | ||
| } | ||
| final Optional<KRaftVersionUpgrade.Voters> inMemoryVoters; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optimized Version Upgrade
Current implementation rejects updates when kraft.version < 1, causing unnecessary request failures. Supporting in-memory voter updates for kraft.version 0 improves system availability during version transitions.
| final Optional<KRaftVersionUpgrade.Voters> inMemoryVoters; | |
| final Optional<KRaftVersionUpgrade.Voters> inMemoryVoters; | |
| final Optional<VoterSet> voters; | |
| if (kraftVersion.isReconfigSupported()) { | |
| inMemoryVoters = Optional.empty(); | |
| // Check that there are no uncommitted VotersRecord | |
| Optional<LogHistory.Entry<VoterSet>> votersEntry = partitionState.lastVoterSetEntry(); | |
| if (votersEntry.isEmpty() || votersEntry.get().offset() >= highWatermark.get()) { | |
| voters = Optional.empty(); | |
| } else { | |
| voters = votersEntry.map(LogHistory.Entry::value); | |
| } | |
| } else { | |
| inMemoryVoters = leaderState.volatileVoters(); | |
| if (inMemoryVoters.isEmpty()) { | |
| /* This can happen if the remote voter sends an update voter request before the | |
| * updated kraft version has been written to the log | |
| */ | |
| return CompletableFuture.completedFuture( | |
| RaftUtil.updateVoterResponse( | |
| Errors.REQUEST_TIMED_OUT, | |
| requestListenerName, | |
| new LeaderAndEpoch( | |
| localId, | |
| leaderState.epoch() | |
| ), | |
| leaderState.leaderEndpoints() | |
| ) | |
| ); | |
| } | |
| voters = inMemoryVoters.map(KRaftVersionUpgrade.Voters::voters); | |
| } |
Standards
- ISO-IEC-25010-Performance-Time-Behaviour
- Netflix-Availability-Optimization
- Google-Performance-Best-Practices
Delete this text and replace it with a detailed description of your change. The
PR title and body will become the squashed commit message.
If you would like to tag individuals, add some commentary, upload images, or
include other supplemental information that should not be part of the eventual
commit message, please use a separate comment.
If applicable, please include a summary of the testing strategy (including
rationale) for the proposed change. Unit and/or integration tests are expected
for any behavior change and system tests should be considered for larger
changes.