Skip to content

Conversation

@visz11
Copy link
Collaborator

@visz11 visz11 commented Oct 6, 2025

Delete this text and replace it with a detailed description of your change. The
PR title and body will become the squashed commit message.

If you would like to tag individuals, add some commentary, upload images, or
include other supplemental information that should not be part of the eventual
commit message, please use a separate comment.

If applicable, please include a summary of the testing strategy (including
rationale) for the proposed change. Unit and/or integration tests are expected
for any behavior change and system tests should be considered for larger
changes.

Summary by CodeRabbit

  • Bug Fixes

    • Improved offset cleanup behavior for groups with pending transactional offsets, preventing premature group deletion.
    • Ensures committed offset tombstones are emitted and cleaned up correctly once transactions complete.
    • Aligns transactional and non-transactional offset handling for more predictable retention and cleanup.
  • Tests

    • Added test coverage for cleanup scenarios involving deleted pending transactional offsets.
    • Added test coverage for groups with only pending transactional offsets to verify correct deletion eligibility and tombstone handling.

When deleting pending transactional offsets, we must preserve the list
of groups associated with the producer ID, otherwise we cannot clean up
the list of pending transactions for the group once the transaction is
committed or aborted.
When a group has pending transactional offsets but no committed offsets,
we can accidentally delete it while cleaning up expired offsets. Add a
check to avoid this case.
@gemini-code-assist
Copy link

Summary of Changes

Hello @visz11, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses a critical issue where the group coordinator could prematurely delete groups that still had ongoing transactional offset commits. By introducing a mechanism to explicitly preserve group metadata for pending transactions and updating the cleanup logic to account for these transactions, the change ensures that groups are only removed when all associated transactional activity has concluded. This enhancement maintains data consistency and prevents potential data loss in transactional offset management.

Highlights

  • Prevent premature group deletion: The group coordinator will no longer delete groups that have pending transactional offsets, even if their committed offsets have expired or been deleted, ensuring transactional integrity.
  • Introduce 'preserveGroups' flag: A new boolean flag preserveGroups has been added to the Offsets internal class. This flag controls whether empty group entries are retained, specifically to track groups with pending transactions.
  • Refined offset cleanup logic: The cleanupExpiredOffsets method has been updated to explicitly check for open transactions before allowing a group to be fully removed, preventing accidental data loss during cleanup.
  • New test cases: Two new unit tests, testCleanupExpiredOffsetsWithDeletedPendingTransactionalOffsets and testCleanupExpiredOffsetsWithPendingTransactionalOffsetsOnly, were added to validate the correct behavior of group cleanup in various transactional offset scenarios.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@refacto-test
Copy link

refacto-test bot commented Oct 6, 2025

Clone squash fix group coordinator deleting groups with pending transactional offsets

TL;DR: Prevents group deletion when pending transactional offsets exist, ensuring data consistency during transactions.


Refacto PR Summary

Fixes a critical bug in the OffsetMetadataManager that was prematurely deleting groups with pending transactional offsets.
The implementation adds a new preserveGroups flag to the Offsets class to track groups associated with pending transactions. This prevents the coordinator from deleting groups that still have open transactions, even when their regular offsets are removed. The fix ensures transaction atomicity by keeping empty group entries until their associated transactions are either committed or aborted, avoiding potential data loss or inconsistency.

Change Highlights

Click to expand
  • OffsetMetadataManager.java: Added preserveGroups flag to Offsets class to prevent group deletion during transactions
  • OffsetMetadataManager.java: Modified cleanupExpiredOffsets to check for open transactions before deleting groups
  • OffsetMetadataManager.java: Updated constructor for Offsets to support preserving empty groups
  • OffsetMetadataManager.java: Fixed group deletion logic in remove method to respect the preserveGroups flag
  • OffsetMetadataManagerTest.java: Added tests for expired offsets with pending and deleted transactional offsets

Sequence Diagram

sequenceDiagram
    participant Client as Client
    participant Coordinator as GroupCoordinator
    participant OffsetManager as OffsetMetadataManager
    participant Storage as Storage
    
    Client->>Coordinator: Begin Transaction (producerId=10)
    Coordinator->>OffsetManager: commitOffset(transactional)
    OffsetManager->>OffsetManager: Store in pendingTransactionalOffsets with preserveGroups=true
    Client->>Coordinator: Delete regular offsets
    Coordinator->>OffsetManager: deleteOffsets(groupId)
    OffsetManager->>Storage: Delete regular offsets
    Note over OffsetManager: Group retained due to pending transaction
    Client->>Coordinator: Commit/Abort Transaction
    Coordinator->>OffsetManager: replayEndTransactionMarker
    OffsetManager->>OffsetManager: Process transaction completion
    Note over OffsetManager: Group can now be deleted if empty
Loading

Testing Guide

Click to expand
  1. Create a consumer group with both regular and transactional offsets
  2. Delete the regular offsets and verify the group is still maintained due to pending transaction
  3. Verify that cleanupExpiredOffsets doesn't delete the group while transaction is pending
  4. Complete the transaction (commit or abort) and verify group can now be deleted if empty
  5. Test edge case where transactional offsets are deleted but transaction is still open

@visz11
Copy link
Collaborator Author

visz11 commented Oct 6, 2025

/refacto-bot

@coderabbitai
Copy link

coderabbitai bot commented Oct 6, 2025

Pre-merge checks and finishing touches

❌ Failed checks (3 warnings)
Check name Status Explanation Resolution
Title Check ⚠️ Warning The title contains branch‐specific clutter and a typo (“Clone squah”) and does not clearly convey the core change of introducing a preserveGroups flag to adjust when group entries are deleted in the presence of pending transactional offsets. It fails to succinctly summarize the main behavioral change from the diff. Please update the title to clearly and concisely reflect the primary change, for example “Add preserveGroups flag to avoid deleting groups with pending transactional offsets,” and remove branch or typo artifacts.
Description Check ⚠️ Warning The pull request description is still the default placeholder template and does not include any details about the change, its rationale, or the testing strategy. It fails to provide the required summary of behavior changes and added tests. Replace the placeholder text with a detailed description covering the purpose of the preserveGroups flag, how it alters offset cleanup behavior for transactional offsets, and include a summary of the new unit tests and overall testing approach as specified by the repository template.
Docstring Coverage ⚠️ Warning Docstring coverage is 22.22% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch clone-squah-fix-group-coordinator-deleting-groups-with-pending-transactional-offsets

📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ed79212 and 3c0cbb0.

📒 Files selected for processing (2)
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java (5 hunks)
  • group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java (1)
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java (1)
  • GroupCoordinatorRecordHelpers (72-902)
🔇 Additional comments (8)
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java (6)

201-205: LGTM! Well-documented design addition.

The preserveGroups flag clearly documents its purpose and enables tracking groups with pending transactions even after all their offsets are removed.


212-215: LGTM! Constructor change aligns with the new design.

Changing from a no-arg constructor to a parameterized one is appropriate for configuring the preservation behavior. Since the class is private, this is a safe internal API change.


266-266: LGTM! Correct implementation of the preserveGroups logic.

The conditional removal correctly preserves empty group entries when preserveGroups=true, which is essential for maintaining accurate open transaction tracking in openTransactionsByGroup.


288-288: LGTM! Correct instantiation for committed offsets.

Regular committed offsets should allow empty groups to be removed (standard cleanup), so preserveGroups=false is correct.


860-861: LGTM! This is the core fix for preventing premature group deletion.

When no committed offsets exist (offsetsByTopic == null), the method now correctly checks for pending transactions before declaring a group deletable. This prevents groups from being deleted while they have ongoing transactions, even if all their committed offsets have been removed.


1005-1005: LGTM! Correct instantiation for pending transactional offsets.

Setting preserveGroups=true for transactional offsets ensures that group entries remain in the map even when all offsets are removed (e.g., via tombstones). This is critical for maintaining the accuracy of openTransactionsByGroup until transactions complete.

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java (2)

2596-2655: LGTM! Comprehensive test for tombstone interaction with pending transactions.

This test thoroughly verifies:

  • Tombstones for pending transactional offsets don't lose track of open transactions
  • Groups are not deleted while transactions remain open (line 2642)
  • After transaction completion, groups can be deleted normally (line 2653)

The test scenario is well-structured and validates the core fix.


2657-2691: LGTM! Clear test for groups with only pending transactional offsets.

This test validates:

  • Committed offsets can expire and be deleted while pending transactions exist
  • Groups remain non-deletable until transactions complete (lines 2684, 2689)
  • Multiple cleanupExpiredOffsets calls handle the state correctly

The scenario is straightforward and complements the previous test well.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@refacto-test
Copy link

refacto-test bot commented Oct 6, 2025

Refacto is reviewing this PR. Please wait for the review comments to be posted.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request fixes a bug where the group coordinator could prematurely delete a group that still had pending transactional offsets. The fix correctly prevents group cleanup if open transactions exist and ensures that the internal data structures for pending offsets preserve group information, even if all pending offsets for that group are deleted within the transaction. The production code changes are sound and well-targeted. The new tests validate the fix, but I've suggested a simplification for one of them to improve clarity and maintainability.

Comment on lines +2648 to +2654
// The group should be deletable now.
context.commitOffset("group-id", "foo", 0, 100L, 0, commitTimestamp);
context.time.sleep(Duration.ofMinutes(1).toMillis());

records = new ArrayList<>();
assertTrue(context.cleanupExpiredOffsets("group-id", records));
assertEquals(expectedRecords, records);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The second half of this test is a bit confusing. After the transaction is committed on line 2646, the group should be deletable. However, the test then re-commits and re-expires the same offset to verify this. This is redundant and makes the test harder to understand.

A simpler approach would be to directly assert that cleanupExpiredOffsets returns true right after the transaction is committed. Since no more offsets are left to expire, the records list should be empty. This would more clearly test the condition for the group being deletable.

Suggested change
// The group should be deletable now.
context.commitOffset("group-id", "foo", 0, 100L, 0, commitTimestamp);
context.time.sleep(Duration.ofMinutes(1).toMillis());
records = new ArrayList<>();
assertTrue(context.cleanupExpiredOffsets("group-id", records));
assertEquals(expectedRecords, records);
// The group should be deletable now as there are no more offsets and no pending transactions.
records = new ArrayList<>();
assertTrue(context.cleanupExpiredOffsets("group-id", records));
// No more offsets are expired, so the records list should be empty.
assertEquals(List.of(), records);

@refacto-test
Copy link

refacto-test bot commented Oct 6, 2025

Code Review: Transaction Management in Offset Handling

👍 Well Done
Preserving Group Metadata

Properly preserving groups with pending transactions prevents data loss and improves reliability.

Comprehensive Test Coverage

Added tests thoroughly validate transaction handling edge cases with pending offsets.

📌 Files Processed
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
  • group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
📝 Additional Comments
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java (5)
Consistent Preservation Logic

The comment explains the purpose but the implementation could be more robust with explicit validation. Consider adding runtime assertions to verify preservation logic is working correctly during state transitions.

Standards:

  • ISO-IEC-25010-Reliability-Maturity
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
Conditional Removal Logic

The conditional removal logic introduces a subtle dependency between group preservation and transaction state. Consider extracting this decision logic into a descriptive method that encapsulates when groups should be preserved, improving future maintainability.

Standards:

  • Clean-Code-Functions
  • SOLID-SRP
  • Maintainability-Quality-Explicitness
Memory Usage Optimization

The Offsets class now stores an additional boolean flag for every instance. While the memory impact is minimal per instance, this could increase memory usage in systems with many offset instances. Consider evaluating the memory-performance tradeoff in high-scale deployments.

Standards:

  • ISO-IEC-25010-Performance-Efficiency-Resource-Utilization
  • Memory-Allocation-Optimization
  • Data-Structure-Efficiency
Group Deletion Control

The updated check prevents group deletion when pending transactions exist but relies on proper maintenance of openTransactionsByGroup. If this map becomes inconsistent, groups with pending transactions could be incorrectly deleted, potentially causing data loss or transaction failures.

Standards:

  • CWE-662
  • OWASP-A04
Transaction State Preservation

The new preserveGroups flag improves transaction integrity by preventing group deletion during active transactions. Consider adding validation or assertions when this flag is used to ensure it's properly initialized, preventing potential transaction integrity issues if misconfigured.

Standards:

  • CWE-665
  • OWASP-A04

offsets.offsetsByGroup.get(groupId);
if (offsetsByTopic == null) {
return true;
return !openTransactionsByGroup.containsKey(groupId);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Group Deletion Logic

The logic only checks openTransactionsByGroup but doesn't verify if there are pending transactional offsets for the group. This could lead to data loss if a group has pending transactional offsets but no open transactions.

            return !openTransactionsByGroup.containsKey(groupId) && !hasPendingTransactionalOffsets(groupId);
Commitable Suggestion
Suggested change
return !openTransactionsByGroup.containsKey(groupId);
return !openTransactionsByGroup.containsKey(groupId) && !hasPendingTransactionalOffsets(groupId);
Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • DbC-State-Consistency

offsets.offsetsByGroup.get(groupId);
if (offsetsByTopic == null) {
return true;
return !openTransactionsByGroup.containsKey(groupId);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Group Deletion Logic

The original code unconditionally returns true when offsetsByTopic is null, indicating the group can be deleted. This is incorrect when the group has pending transactional offsets, as it would lead to premature group deletion. The fix correctly checks if there are open transactions for the group.

            return !openTransactionsByGroup.containsKey(groupId) && !hasPendingTransactionalOffsets(groupId);
Commitable Suggestion
Suggested change
return !openTransactionsByGroup.containsKey(groupId);
return !openTransactionsByGroup.containsKey(groupId) && !hasPendingTransactionalOffsets(groupId);
Standards
  • Business-Rule-State-Consistency
  • Logic-Verification-Control-Flow
  • Algorithm-Correctness-Condition-Logic

topicOffsets.remove(topic);

if (topicOffsets.isEmpty())
if (!preserveGroups && topicOffsets.isEmpty())
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Group Removal Condition

The original code unconditionally removes groups when their topic offsets are empty, ignoring pending transactions. The fix adds a conditional check (!preserveGroups) to prevent removing groups that have pending transactional offsets, maintaining data consistency during transaction processing.

            if (!preserveGroups && topicOffsets.isEmpty())
Commitable Suggestion
Suggested change
if (!preserveGroups && topicOffsets.isEmpty())
if (!preserveGroups && topicOffsets.isEmpty())
Standards
  • Business-Rule-State-Consistency
  • Logic-Verification-Control-Flow
  • Algorithm-Correctness-Condition-Logic

// the transaction is committed; or removed when the transaction is aborted.
pendingTransactionalOffsets
.computeIfAbsent(producerId, __ -> new Offsets())
.computeIfAbsent(producerId, __ -> new Offsets(true))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Transactional Offsets Handling

The original code creates new Offsets instances without preserving empty groups for pending transactions. The fix creates Offsets with preserveGroups=true for pending transactional offsets, ensuring groups with pending transactions aren't prematurely deleted when offsets are removed.

                    .computeIfAbsent(producerId, __ -> new Offsets(true))
Commitable Suggestion
Suggested change
.computeIfAbsent(producerId, __ -> new Offsets(true))
.computeIfAbsent(producerId, __ -> new Offsets(true))
Standards
  • Business-Rule-State-Consistency
  • Logic-Verification-Data-Flow
  • Algorithm-Correctness-Parameter-Logic

this.config = config;
this.metrics = metrics;
this.offsets = new Offsets();
this.offsets = new Offsets(false);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default Offsets Configuration

The original code creates the main offsets store without explicitly setting group preservation behavior. The fix creates Offsets with preserveGroups=false for the main store, making the behavior explicit and consistent with the intended design where empty groups should be removed from the main store.

        this.offsets = new Offsets(false);
Commitable Suggestion
Suggested change
this.offsets = new Offsets(false);
this.offsets = new Offsets(false);
Standards
  • Business-Rule-State-Consistency
  • Logic-Verification-Data-Flow
  • Algorithm-Correctness-Parameter-Logic

Comment on lines +212 to 214
private Offsets(boolean preserveGroups) {
this.preserveGroups = preserveGroups;
this.offsetsByGroup = new TimelineHashMap<>(snapshotRegistry, 0);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Group Preservation Logic

The original Offsets constructor lacks a parameter to control group preservation behavior. This leads to inconsistent handling of groups with pending transactions. The fix adds a preserveGroups parameter to explicitly control whether empty groups should be preserved during offset removal.

        private Offsets(boolean preserveGroups) {
            this.preserveGroups = preserveGroups;
            this.offsetsByGroup = new TimelineHashMap<>(snapshotRegistry, 0);
        }

        /**
         * Default constructor that doesn't preserve empty groups.
         */
        private Offsets() {
            this(false);
        }
Commitable Suggestion
Suggested change
private Offsets(boolean preserveGroups) {
this.preserveGroups = preserveGroups;
this.offsetsByGroup = new TimelineHashMap<>(snapshotRegistry, 0);
private Offsets(boolean preserveGroups) {
this.preserveGroups = preserveGroups;
this.offsetsByGroup = new TimelineHashMap<>(snapshotRegistry, 0);
}
/**
* Default constructor that doesn't preserve empty groups.
*/
private Offsets() {
this(false);
}
Standards
  • Business-Rule-State-Consistency
  • Logic-Verification-Data-Flow
  • Algorithm-Correctness-Parameter-Logic

offsets.offsetsByGroup.get(groupId);
if (offsetsByTopic == null) {
return true;
return !openTransactionsByGroup.containsKey(groupId);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inefficient Group Check

The group existence check has been modified to check openTransactionsByGroup which is more expensive than the previous empty check. This operation now requires a hash lookup for every group check, potentially affecting performance in high-throughput scenarios.

Standards
  • ISO-IEC-25010-Performance-Efficiency-Time-Behavior
  • Optimization-Pattern-Conditional-Efficiency
  • Algorithmic-Complexity-Lookup-Performance

Comment on lines +212 to +213
private Offsets(boolean preserveGroups) {
this.preserveGroups = preserveGroups;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Boolean Parameter Clarity

The boolean parameter creates a primitive obsession code smell. Consider creating an explicit enum or constant for the preserveGroups parameter to improve code readability and make the intention clearer at instantiation sites.

Standards
  • Clean-Code-Parameters
  • Refactoring-Replace-Primitive
  • Maintainability-Quality-Readability

this.config = config;
this.metrics = metrics;
this.offsets = new Offsets();
this.offsets = new Offsets(false);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default Parameter Value

Using a literal false value without context makes the code's intent unclear. Consider creating a named constant like DEFAULT_PRESERVE_GROUPS = false to document the default behavior and improve code readability.

Standards
  • Clean-Code-Constants
  • Maintainability-Quality-Self-Documentation
  • Clean-Code-Naming

offsets.offsetsByGroup.get(groupId);
if (offsetsByTopic == null) {
return true;
return !openTransactionsByGroup.containsKey(groupId);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Transaction State Check

The negated condition makes the logic harder to follow. Consider extracting this check into a descriptive method like hasOpenTransactions(groupId) and inverting the logic to make the intent clearer and improve maintainability.

Standards
  • Clean-Code-Functions
  • Maintainability-Quality-Readability
  • Clean-Code-Conditionals

// the transaction is committed; or removed when the transaction is aborted.
pendingTransactionalOffsets
.computeIfAbsent(producerId, __ -> new Offsets())
.computeIfAbsent(producerId, __ -> new Offsets(true))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Preserve Flag Documentation

The true literal lacks context at the call site. Consider creating a named constant like PRESERVE_GROUPS_FOR_TRANSACTIONS = true to document the behavior and improve maintainability when reading transactional offset handling code.

Standards
  • Clean-Code-Constants
  • Maintainability-Quality-Self-Documentation
  • Clean-Code-Naming

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants