Skip to content

Conversation

@arvi18
Copy link

@arvi18 arvi18 commented Apr 21, 2025

When a group has pending transactional offsets but no committed offsets,
we can accidentally delete it while cleaning up expired offsets. Add a
check to avoid this case.

Summary by CodeRabbit

  • Bug Fixes
    • Improved handling of group deletion and cleanup when pending transactional offsets exist, ensuring groups are only removed when no open transactions remain.
  • Tests
    • Added tests to verify correct cleanup behavior with deleted and pending transactional offsets, ensuring groups are preserved appropriately during ongoing transactions.

When deleting pending transactional offsets, we must preserve the list
of groups associated with the producer ID, otherwise we cannot clean up
the list of pending transactions for the group once the transaction is
committed or aborted.
When a group has pending transactional offsets but no committed offsets,
we can accidentally delete it while cleaning up expired offsets. Add a
check to avoid this case.
@arvi18
Copy link
Author

arvi18 commented Apr 21, 2025

Marking this as a draft until apache#19495 is completed, since there is a merge conflict.

@coderabbitai
Copy link

coderabbitai bot commented Apr 21, 2025

Walkthrough

The changes introduce a new preserveGroups flag in the Offsets inner class of OffsetMetadataManager, allowing differentiated handling of empty group entries between the main offsets store and the pending transactional offsets store. The flag is set to false for the main offsets and true for pending transactional offsets, affecting group removal and cleanup logic, especially in the presence of open transactions. Corresponding updates are made to the remove method and the cleanupExpiredOffsets logic. Two new test cases are added to verify that group deletion behaves correctly with pending transactional offsets.

Changes

File(s) Change Summary
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java Added preserveGroups flag to Offsets inner class; updated constructor and remove method; modified group removal and cleanup logic; updated instantiation for main and transactional offsets.
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java Added two tests: one for cleanup with deleted pending transactional offsets, and one for cleanup with only pending transactional offsets.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant OffsetMetadataManager
    participant Offsets
    participant PendingTransactionalOffsets

    Client->>OffsetMetadataManager: Commit Offset (possibly transactional)
    alt Regular Commit
        OffsetMetadataManager->>Offsets: Store offset (preserveGroups = false)
    else Transactional Commit
        OffsetMetadataManager->>PendingTransactionalOffsets: Store offset (preserveGroups = true)
    end

    Client->>OffsetMetadataManager: Cleanup Expired Offsets
    OffsetMetadataManager->>Offsets: Remove expired offsets
    OffsetMetadataManager->>PendingTransactionalOffsets: Check for open transactions
    alt No open transactions
        OffsetMetadataManager->>Offsets: Remove group if empty
    else Open transactions exist
        OffsetMetadataManager->>Offsets: Preserve group entry
    end
Loading

Poem

In the warren of offsets, a flag now appears,
To preserve or remove, it manages our fears.
With transactions still pending, the group stays in sight,
While regular offsets may vanish from night.
Tests hop along, ensuring all’s right—
In the garden of Kafka, the logic’s now tight!
🐇✨

✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@visz11
Copy link
Collaborator

visz11 commented Apr 21, 2025

@coderabbitai full review

@coderabbitai
Copy link

coderabbitai bot commented Apr 21, 2025

✅ Actions performed

Full review triggered.

@visz11
Copy link
Collaborator

visz11 commented Apr 22, 2025

@coderabbitai full review

@coderabbitai
Copy link

coderabbitai bot commented Apr 22, 2025

✅ Actions performed

Full review triggered.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (4)
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java (3)

266-268: Consider opportunistic cleanup of empty groups once transactions resolve

With preserveGroups == true we intentionally keep an empty group bucket while a transaction is open.
However, once the transaction is completed (commit/abort) we drop the entire Offsets instance from pendingTransactionalOffsets, meaning the additional empty groupId → {} entry will be garbage‑collected.
In the corner‑case where the last partition is tomb‑stoned before the transaction ends, the empty map remains alive until the TX finishes – that’s fine, but you could free it immediately by:

- if (!preserveGroups && topicOffsets.isEmpty())
+ if (( !preserveGroups || offsetsByGroup.get(groupId).isEmpty())
+     && topicOffsets.isEmpty())
     offsetsByGroup.remove(groupId);

Not critical (bounded memory) but can shorten the retention of now‑useless objects.


858-862: Early‑return now guards against open transactions – good, but update the javadoc

Changing

if (offsetsByTopic == null) {
    return !openTransactionsByGroup.containsKey(groupId);
}

prevents accidental group deletion while openTransactionsByGroup still tracks a producer.
Please update the method‑level javadoc (lines 850‑856) to mention that false can also be returned when no committed offsets exist but pending transactions are present.


894-896: Behavioural change is subtle – add metrics or TRACE log?

The final return value now short‑circuits on !openTransactionsByGroup.containsKey(groupId).
For operators it might be useful to emit a TRACE when cleanup is aborted because of pending transactions, e.g.:

if (!openTransactionsByGroup.containsKey(groupId) ) {
    log.trace("[GroupId {}] Skipping deletion – open transactional offsets still present", groupId);
}

Helps to diagnose “why does this empty group stick around?” questions.

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java (1)

2658-2691: Edge‑case test for “pending‑only” group is valuable, consider clarifying comment

The new testCleanupExpiredOffsetsWithPendingTransactionalOffsetsOnly() verifies that even when all committed offsets have expired, the group persists while it still owns only pending transactional offsets.

Tiny nit: the inline comment has a typo – beacusebecause.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ed79212 and 3c0cbb0.

📒 Files selected for processing (2)
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java (5 hunks)
  • group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java (1 hunks)
🔇 Additional comments (3)
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java (2)

200-207: Scope guard for groups with pending transactions is clear and self‑documenting

The introduction of preserveGroups explicitly documents why empty group entries must be kept around while a transaction is in flight. The in‑line Javadoc explains the intent, which is often overlooked in coordinator code – nice touch!


1005-1006: Correctly propagates the ‘preserveGroups’ flag to transactional store

Using new Offsets(true) inside the transactional path ensures the coordinator remembers the group even after all pending partitions are removed. This aligns with the cleanup logic introduced above.

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java (1)

2596-2655: Test covers delete‑then‑expire regression – great addition

testCleanupExpiredOffsetsWithDeletedPendingTransactionalOffsets() faithfully reproduces the scenario fixed by the patch:

  1. regular commit
  2. transactional commit (pending)
  3. tomb‑stone the transactional offset
  4. ensure group is not deleted until the TX is completed

The asserts match the intended behaviour and the use of MockTime keeps the test deterministic.

@visz11
Copy link
Collaborator

visz11 commented Apr 24, 2025

/gemini review

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request addresses a potential issue where groups with pending transactional offsets could be accidentally deleted during expired offset cleanup. The changes introduce a check to prevent this scenario and include new tests to verify the fix. Overall, the changes seem well-structured and address the problem effectively.

Summary of Findings

  • Missing Preservation of Groups: The initial Offsets class did not account for preserving groups with pending transactions. The addition of the preserveGroups flag and its usage in the remove method ensures that groups associated with pending transactions are not prematurely deleted.
  • Cleanup Logic: The cleanup logic in cleanupExpiredOffsets has been modified to check for open transactions before allowing deletion, preventing accidental removal of groups with pending transactional offsets.
  • Test Coverage: New tests have been added to cover scenarios with deleted and pending transactional offsets, ensuring that the cleanup process behaves correctly under various conditions.

Merge Readiness

The pull request addresses an important correctness issue and includes thorough testing. I recommend addressing the high severity issue before merging. I am unable to approve this pull request, and users should have others review and approve this code before merging.

@arvi18
Copy link
Author

arvi18 commented Sep 2, 2025

/refacto-test

@refacto-test
Copy link

refacto-test bot commented Sep 2, 2025

Refacto is reviewing this PR. Please wait for the review comments to be posted.

@refacto-test
Copy link

refacto-test bot commented Sep 2, 2025

Code Review: Transaction Handling in Offset Management

👍 Well Done
Preserving Group Integrity

Preventing group deletion during pending transactions improves system reliability.

Comprehensive Test Coverage

Added tests verify behavior with deleted pending transactional offsets.

📌 Files Processed
  • group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
📝 Additional Comments
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java (3)
Group Deletion Optimization

The containsKey operation on openTransactionsByGroup is called during offset cleanup. While efficient for individual lookups (O(1) complexity), this adds a conditional check on every group deletion attempt. For systems with many groups and frequent cleanup cycles, this introduces additional overhead.

Conditional Group Removal

Conditional removal logic adds complexity to offset management. The preservation logic is now tied to group removal decision, creating subtle coupling. Consider extracting group preservation strategy to improve separation of concerns.

Data Consistency Improvement

The original code unconditionally removed groups when topic offsets were empty, potentially causing data inconsistency with pending transactions. This could lead to transaction integrity issues and potential data loss.

offsets.offsetsByGroup.get(groupId);
if (offsetsByTopic == null) {
return true;
return !openTransactionsByGroup.containsKey(groupId);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Group Deletion Logic

Original logic incorrectly returns true (allowing group deletion) when offsetsByTopic is null, without checking for pending transactions. This could lead to data inconsistency by deleting groups with pending transactional offsets.

Comment on lines +266 to 267
if (!preserveGroups && topicOffsets.isEmpty())
offsetsByGroup.remove(groupId);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Group Preservation Implementation

Original implementation unconditionally removes groups when topic offsets are empty, failing to preserve groups with pending transactions. New logic adds a preserveGroups flag to conditionally maintain empty groups with pending transactions.

Comment on lines +861 to 862
return !openTransactionsByGroup.containsKey(groupId);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incomplete Transaction Handling

The condition change prevents group deletion only when transactions are open, but doesn't check for pending transactional offsets. This creates inconsistency with preserveGroups flag implementation.

Comment on lines +212 to 214
private Offsets(boolean preserveGroups) {
this.preserveGroups = preserveGroups;
this.offsetsByGroup = new TimelineHashMap<>(snapshotRegistry, 0);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Boolean Flag Parameter

Constructor with boolean parameter creates unclear API. The boolean flag parameter makes code less readable and intent harder to understand at call sites. Consider using named factory methods or builder pattern.

@visz11
Copy link
Collaborator

visz11 commented Sep 16, 2025

/refacto-test-arvi

@refacto-visz
Copy link

refacto-visz bot commented Sep 16, 2025

Refacto is reviewing this PR. Please wait for the review comments to be posted.

@refacto-visz
Copy link

refacto-visz bot commented Sep 16, 2025

Code Review: Group Coordinator Implementation

👍 Well Done
Structured Transaction Management

The implementation provides a structured approach to managing group transactions and offset metadata.

📌 Files Processed
  • group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java

offsets.offsetsByGroup.get(groupId);
if (offsetsByTopic == null) {
return true;
return !openTransactionsByGroup.containsKey(groupId);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent Group Preservation

The code returns true (allowing group deletion) when no open transactions exist, but doesn't check for pending transactional offsets that might be deleted. This could lead to data loss if a group with deleted pending offsets is removed before transaction completion.

Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • DbC-State-Consistency

@visz11 visz11 closed this Oct 6, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants