Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -198,12 +198,19 @@ public OffsetMetadataManager build() {
private final TimelineHashMap<String, TimelineHashSet<Long>> openTransactionsByGroup;

private class Offsets {
/**
* Whether to preserve empty entries for groups when removing offsets.
* We use this to keep track of the groups associated with pending transactions.
*/
private final boolean preserveGroups;

/**
* The offsets keyed by group id, topic name and partition id.
*/
private final TimelineHashMap<String, TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>>> offsetsByGroup;

private Offsets() {
private Offsets(boolean preserveGroups) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential Resource Exhaustion

Preserving empty group entries indefinitely could lead to memory exhaustion. Attackers could create numerous groups with pending transactions, depleting coordinator resources.

Suggested change
private Offsets(boolean preserveGroups) {
this.preserveGroups = preserveGroups;
// TODO: Add monitoring and limits for preserved empty groups to prevent resource exhaustion
Standards
  • CWE-400
  • OWASP-A04

this.preserveGroups = preserveGroups;
this.offsetsByGroup = new TimelineHashMap<>(snapshotRegistry, 0);
Comment on lines +212 to 214
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uninitialized Flag Parameter

The Offsets constructor was modified to accept a preserveGroups parameter but the original constructor call at line 288 passes false without explanation. This could lead to inconsistent group preservation behavior.

Standards
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • ISO-IEC-25010-Reliability-Fault-Tolerance

Comment on lines +201 to 214
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Insufficient Constructor Documentation

The constructor parameter is documented but lacks explanation of when to use true/false values and their implications. This makes future modifications risky as developers may not understand the full impact of this parameter.

Standards
  • Clean-Code-Documentation
  • Clean-Code-Naming

Comment on lines +212 to 214
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Boolean Parameter Clarity

Boolean parameter lacks semantic clarity, making intention hard to understand from call sites. Consider using a more descriptive enum or builder pattern to make the parameter's purpose self-documenting.

Standards
  • Clean-Code-Meaningful-Names
  • Clean-Code-Functions

}

Expand Down Expand Up @@ -256,7 +263,7 @@ private OffsetAndMetadata remove(
if (partitionOffsets.isEmpty())
topicOffsets.remove(topic);

if (topicOffsets.isEmpty())
if (!preserveGroups && topicOffsets.isEmpty())
offsetsByGroup.remove(groupId);
Comment on lines +266 to 267
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Group Preservation Overhead

The preserveGroups flag controls whether empty group entries are retained in memory. For transactional offsets, this causes unnecessary memory retention as groups with deleted offsets remain in the map until transaction completion.

Suggested change
if (!preserveGroups && topicOffsets.isEmpty())
offsetsByGroup.remove(groupId);
if (!preserveGroups && topicOffsets.isEmpty())
offsetsByGroup.remove(groupId);
else if (preserveGroups && topicOffsets.isEmpty() && !openTransactionsByGroup.containsKey(groupId))
offsetsByGroup.remove(groupId);
Standards
  • ISO-IEC-25010-Performance-Resource-Utilization
  • Algorithm-Opt-Memory-Management

Comment on lines +266 to 267
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Group Preservation Logic

The original code incorrectly removed groups with empty topic offsets even when they had pending transactions. This could cause premature group deletion and data inconsistency when transactions complete later.

Suggested change
if (!preserveGroups && topicOffsets.isEmpty())
offsetsByGroup.remove(groupId);
if (!preserveGroups && topicOffsets.isEmpty())
offsetsByGroup.remove(groupId);
Standards
  • Business-Rule-Validation
  • Logic-Verification-Completeness
  • Algorithm-Correctness-State-Preservation


return removedValue;
Expand All @@ -278,7 +285,7 @@ private OffsetAndMetadata remove(
this.groupMetadataManager = groupMetadataManager;
this.config = config;
this.metrics = metrics;
this.offsets = new Offsets();
this.offsets = new Offsets(false);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Constructor Parameter

The Offsets constructor is modified to take a boolean parameter, but the main offsets instance is initialized with the parameterless constructor which no longer exists. This would cause compilation failure since the constructor signature has changed.

Standards
  • Algorithm-Correctness-Parameter-Validation
  • Logic-Verification-Constructor-Consistency

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Group Preservation

Main offsets store initialized with preserveGroups=false while pending transactional offsets use true. Inconsistent behavior could lead to premature group deletion.

Suggested change
this.offsets = new Offsets(false);
this.offsets = new Offsets(true);
Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • ISO-IEC-25010-Functional-Correctness-Appropriateness

this.pendingTransactionalOffsets = new TimelineHashMap<>(snapshotRegistry, 0);
this.openTransactionsByGroup = new TimelineHashMap<>(snapshotRegistry, 0);
}
Expand Down Expand Up @@ -995,7 +1002,7 @@ public void replay(
// offsets store. Pending offsets there are moved to the main store when
// the transaction is committed; or removed when the transaction is aborted.
pendingTransactionalOffsets
.computeIfAbsent(producerId, __ -> new Offsets())
.computeIfAbsent(producerId, __ -> new Offsets(true))
.put(
groupId,
topic,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2593,6 +2593,67 @@ public void testCleanupExpiredOffsetsWithPendingTransactionalOffsets() {
assertEquals(List.of(), records);
}

@Test
public void testCleanupExpiredOffsetsWithDeletedPendingTransactionalOffsets() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
Group group = mock(Group.class);

OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder()
.withGroupMetadataManager(groupMetadataManager)
.withOffsetsRetentionMinutes(1)
.build();

long commitTimestamp = context.time.milliseconds();

context.commitOffset("group-id", "foo", 0, 100L, 0, commitTimestamp);
context.commitOffset(10L, "group-id", "foo", 1, 101L, 0, commitTimestamp + 500);

when(groupMetadataManager.group("group-id")).thenReturn(group);
when(group.offsetExpirationCondition()).thenReturn(Optional.of(
new OffsetExpirationConditionImpl(offsetAndMetadata -> offsetAndMetadata.commitTimestampMs)));
when(group.isSubscribedToTopic("foo")).thenReturn(false);

// Delete the pending transactional offset.
OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection =
new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(List.of(
new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
.setName("foo")
.setPartitions(List.of(
new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(1)
))
).iterator());
CoordinatorResult<OffsetDeleteResponseData, CoordinatorRecord> result = context.deleteOffsets(
new OffsetDeleteRequestData()
.setGroupId("group-id")
.setTopics(requestTopicCollection)
);
List<CoordinatorRecord> expectedRecords = List.of(
GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord("group-id", "foo", 1)
);
assertEquals(expectedRecords, result.records());

context.time.sleep(Duration.ofMinutes(1).toMillis());

// The group should not be deleted because it has a pending transaction.
expectedRecords = List.of(
GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord("group-id", "foo", 0)
);
List<CoordinatorRecord> records = new ArrayList<>();
assertFalse(context.cleanupExpiredOffsets("group-id", records));
assertEquals(expectedRecords, records);

// Commit the ongoing transaction.
context.replayEndTransactionMarker(10L, TransactionResult.COMMIT);

// The group should be deletable now.
context.commitOffset("group-id", "foo", 0, 100L, 0, commitTimestamp);
context.time.sleep(Duration.ofMinutes(1).toMillis());

records = new ArrayList<>();
assertTrue(context.cleanupExpiredOffsets("group-id", records));
assertEquals(expectedRecords, records);
}

private static OffsetFetchResponseData.OffsetFetchResponsePartitions mkOffsetPartitionResponse(
int partition,
long offset,
Expand Down