diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java index 2b50071a7f771..60b88dbbfb516 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -194,9 +194,16 @@ public OffsetMetadataManager build() { /** * The open transactions (producer ids) keyed by group. + * Tracks whether groups have any open transactions. */ private final TimelineHashMap> openTransactionsByGroup; + /** + * The open transactions (producer ids) keyed by group id, topic name and partition id. + * Tracks whether partitions have any pending transactional offsets. + */ + private final TimelineHashMap>>> openTransactionsByGroupTopicAndPartition; + private class Offsets { /** * The offsets keyed by group id, topic name and partition id. @@ -281,6 +288,7 @@ private OffsetAndMetadata remove( this.offsets = new Offsets(); this.pendingTransactionalOffsets = new TimelineHashMap<>(snapshotRegistry, 0); this.openTransactionsByGroup = new TimelineHashMap<>(snapshotRegistry, 0); + this.openTransactionsByGroupTopicAndPartition = new TimelineHashMap<>(snapshotRegistry, 0); } /** @@ -650,24 +658,18 @@ public int deleteAllOffsets( // Delete all the pending transactional offsets too. Here we only write a tombstone // if the topic-partition was not in the main storage because we don't need to write // two consecutive tombstones. - TimelineHashSet openTransactions = openTransactionsByGroup.get(groupId); - if (openTransactions != null) { - openTransactions.forEach(producerId -> { - Offsets pendingOffsets = pendingTransactionalOffsets.get(producerId); - if (pendingOffsets != null) { - TimelineHashMap> pendingGroupOffsets = - pendingOffsets.offsetsByGroup.get(groupId); - if (pendingGroupOffsets != null) { - pendingGroupOffsets.forEach((topic, offsetsByPartition) -> { - offsetsByPartition.keySet().forEach(partition -> { - if (!hasCommittedOffset(groupId, topic, partition)) { - records.add(GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord(groupId, topic, partition)); - numDeletedOffsets.getAndIncrement(); - } - }); - }); - } - } + TimelineHashMap>> openTransactionsByTopic = + openTransactionsByGroupTopicAndPartition.get(groupId); + if (openTransactionsByTopic != null) { + openTransactionsByTopic.forEach((topic, openTransactionsByPartition) -> { + openTransactionsByPartition.forEach((partition, producerIds) -> { + producerIds.forEach(producerId -> { + if (!hasCommittedOffset(groupId, topic, partition)) { + records.add(GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord(groupId, topic, partition)); + numDeletedOffsets.getAndIncrement(); + } + }); + }); }); } @@ -685,17 +687,15 @@ boolean hasPendingTransactionalOffsets( String topic, int partition ) { - final TimelineHashSet openTransactions = openTransactionsByGroup.get(groupId); - if (openTransactions == null) return false; + TimelineHashMap>> openTransactionsByTopic = + openTransactionsByGroupTopicAndPartition.get(groupId); + if (openTransactionsByTopic == null) return false; - for (Long producerId : openTransactions) { - Offsets offsets = pendingTransactionalOffsets.get(producerId); - if (offsets != null && offsets.get(groupId, topic, partition) != null) { - return true; - } - } + TimelineHashMap> openTransactionsByPartition = openTransactionsByTopic.get(topic); + if (openTransactionsByPartition == null) return false; - return false; + TimelineHashSet openTransactions = openTransactionsByPartition.get(partition); + return openTransactions != null && !openTransactions.isEmpty(); } /** @@ -1005,6 +1005,11 @@ public void replay( openTransactionsByGroup .computeIfAbsent(groupId, __ -> new TimelineHashSet<>(snapshotRegistry, 1)) .add(producerId); + openTransactionsByGroupTopicAndPartition + .computeIfAbsent(groupId, __ -> new TimelineHashMap<>(snapshotRegistry, 1)) + .computeIfAbsent(topic, __ -> new TimelineHashMap<>(snapshotRegistry, 1)) + .computeIfAbsent(partition, __ -> new TimelineHashSet<>(snapshotRegistry, 1)) + .add(producerId); } } else { if (offsets.remove(groupId, topic, partition) != null) { @@ -1012,14 +1017,29 @@ public void replay( } // Remove all the pending offset commits related to the tombstone. - TimelineHashSet openTransactions = openTransactionsByGroup.get(groupId); - if (openTransactions != null) { - openTransactions.forEach(openProducerId -> { - Offsets pendingOffsets = pendingTransactionalOffsets.get(openProducerId); - if (pendingOffsets != null) { - pendingOffsets.remove(groupId, topic, partition); + TimelineHashMap>> openTransactionsByTopic = + openTransactionsByGroupTopicAndPartition.get(groupId); + if (openTransactionsByTopic != null) { + TimelineHashMap> openTransactionsByPartition = openTransactionsByTopic.get(topic); + if (openTransactionsByPartition != null) { + TimelineHashSet openTransactions = openTransactionsByPartition.get(partition); + if (openTransactions != null) { + openTransactions.forEach(openProducerId -> { + Offsets pendingOffsets = pendingTransactionalOffsets.get(openProducerId); + if (pendingOffsets != null) { + pendingOffsets.remove(groupId, topic, partition); + } + }); + + openTransactionsByPartition.remove(partition); + if (openTransactionsByPartition.isEmpty()) { + openTransactionsByTopic.remove(topic); + } + if (openTransactionsByTopic.isEmpty()) { + openTransactionsByGroupTopicAndPartition.remove(groupId); + } } - }); + } } } } @@ -1031,6 +1051,7 @@ public void replay( * @param result The result of the transaction. * @throws RuntimeException if the transaction can not be completed. */ + @SuppressWarnings("NPathComplexity") public void replayEndTransactionMarker( long producerId, TransactionResult result @@ -1043,14 +1064,39 @@ public void replayEndTransactionMarker( return; } - pendingOffsets.offsetsByGroup.keySet().forEach(groupId -> { - TimelineHashSet openTransactions = openTransactionsByGroup.get(groupId); - if (openTransactions != null) { - openTransactions.remove(producerId); - if (openTransactions.isEmpty()) { + pendingOffsets.offsetsByGroup.forEach((groupId, topicOffsets) -> { + TimelineHashSet groupTransactions = openTransactionsByGroup.get(groupId); + if (groupTransactions != null) { + groupTransactions.remove(producerId); + if (groupTransactions.isEmpty()) { openTransactionsByGroup.remove(groupId); } } + + TimelineHashMap>> openTransactionsByTopic = + openTransactionsByGroupTopicAndPartition.get(groupId); + if (openTransactionsByTopic == null) return; + + topicOffsets.forEach((topic, partitionOffsets) -> { + TimelineHashMap> openTransactionsByPartition = openTransactionsByTopic.get(topic); + if (openTransactionsByPartition == null) return; + + partitionOffsets.keySet().forEach(partitionId -> { + TimelineHashSet partitionTransactions = openTransactionsByPartition.get(partitionId); + if (partitionTransactions != null) { + partitionTransactions.remove(producerId); + if (partitionTransactions.isEmpty()) { + openTransactionsByPartition.remove(partitionId); + } + if (openTransactionsByPartition.isEmpty()) { + openTransactionsByTopic.remove(topic); + } + if (openTransactionsByTopic.isEmpty()) { + openTransactionsByGroupTopicAndPartition.remove(groupId); + } + } + }); + }); }); if (result == TransactionResult.COMMIT) {