-
Notifications
You must be signed in to change notification settings - Fork 0
KAFKA-19163: Avoid deleting groups with pending transactional offsets #7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -198,12 +198,19 @@ public OffsetMetadataManager build() { | |
| private final TimelineHashMap<String, TimelineHashSet<Long>> openTransactionsByGroup; | ||
|
|
||
| private class Offsets { | ||
| /** | ||
| * Whether to preserve empty entries for groups when removing offsets. | ||
| * We use this to keep track of the groups associated with pending transactions. | ||
| */ | ||
| private final boolean preserveGroups; | ||
|
|
||
| /** | ||
| * The offsets keyed by group id, topic name and partition id. | ||
| */ | ||
| private final TimelineHashMap<String, TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>>> offsetsByGroup; | ||
|
|
||
| private Offsets() { | ||
| private Offsets(boolean preserveGroups) { | ||
| this.preserveGroups = preserveGroups; | ||
| this.offsetsByGroup = new TimelineHashMap<>(snapshotRegistry, 0); | ||
| } | ||
|
|
||
|
|
@@ -256,7 +263,7 @@ private OffsetAndMetadata remove( | |
| if (partitionOffsets.isEmpty()) | ||
| topicOffsets.remove(topic); | ||
|
|
||
| if (topicOffsets.isEmpty()) | ||
| if (!preserveGroups && topicOffsets.isEmpty()) | ||
| offsetsByGroup.remove(groupId); | ||
|
Comment on lines
+266
to
267
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Group Preservation ImplementationOriginal implementation unconditionally removes groups when topic offsets are empty, failing to preserve groups with pending transactions. New logic adds a preserveGroups flag to conditionally maintain empty groups with pending transactions. |
||
|
|
||
| return removedValue; | ||
|
|
@@ -278,7 +285,7 @@ private OffsetAndMetadata remove( | |
| this.groupMetadataManager = groupMetadataManager; | ||
| this.config = config; | ||
| this.metrics = metrics; | ||
| this.offsets = new Offsets(); | ||
| this.offsets = new Offsets(false); | ||
| this.pendingTransactionalOffsets = new TimelineHashMap<>(snapshotRegistry, 0); | ||
| this.openTransactionsByGroup = new TimelineHashMap<>(snapshotRegistry, 0); | ||
| } | ||
|
|
@@ -851,7 +858,7 @@ public boolean cleanupExpiredOffsets(String groupId, List<CoordinatorRecord> rec | |
| TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> offsetsByTopic = | ||
| offsets.offsetsByGroup.get(groupId); | ||
| if (offsetsByTopic == null) { | ||
| return true; | ||
| return !openTransactionsByGroup.containsKey(groupId); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Group Deletion LogicOriginal logic incorrectly returns true (allowing group deletion) when offsetsByTopic is null, without checking for pending transactions. This could lead to data inconsistency by deleting groups with pending transactional offsets. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Inconsistent Group PreservationThe code returns true (allowing group deletion) when no open transactions exist, but doesn't check for pending transactional offsets that might be deleted. This could lead to data loss if a group with deleted pending offsets is removed before transaction completion. Standards
|
||
| } | ||
|
Comment on lines
+861
to
862
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Incomplete Transaction HandlingThe condition change prevents group deletion only when transactions are open, but doesn't check for pending transactional offsets. This creates inconsistency with preserveGroups flag implementation. |
||
|
|
||
| // We expect the group to exist. | ||
|
|
@@ -995,7 +1002,7 @@ public void replay( | |
| // offsets store. Pending offsets there are moved to the main store when | ||
| // the transaction is committed; or removed when the transaction is aborted. | ||
| pendingTransactionalOffsets | ||
| .computeIfAbsent(producerId, __ -> new Offsets()) | ||
| .computeIfAbsent(producerId, __ -> new Offsets(true)) | ||
| .put( | ||
| groupId, | ||
| topic, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Boolean Flag Parameter
Constructor with boolean parameter creates unclear API. The boolean flag parameter makes code less readable and intent harder to understand at call sites. Consider using named factory methods or builder pattern.