@@ -194,9 +194,16 @@ public OffsetMetadataManager build() {
194194
195195 /**
196196 * The open transactions (producer ids) keyed by group.
197+ * Tracks whether groups have any open transactions.
197198 */
198199 private final TimelineHashMap <String , TimelineHashSet <Long >> openTransactionsByGroup ;
199200
201+ /**
202+ * The open transactions (producer ids) keyed by group id, topic name and partition id.
203+ * Tracks whether partitions have any pending transactional offsets.
204+ */
205+ private final TimelineHashMap <String , TimelineHashMap <String , TimelineHashMap <Integer , TimelineHashSet <Long >>>> openTransactionsByGroupTopicAndPartition ;
206+
200207 private class Offsets {
201208 /**
202209 * The offsets keyed by group id, topic name and partition id.
@@ -281,6 +288,7 @@ private OffsetAndMetadata remove(
281288 this .offsets = new Offsets ();
282289 this .pendingTransactionalOffsets = new TimelineHashMap <>(snapshotRegistry , 0 );
283290 this .openTransactionsByGroup = new TimelineHashMap <>(snapshotRegistry , 0 );
291+ this .openTransactionsByGroupTopicAndPartition = new TimelineHashMap <>(snapshotRegistry , 0 );
284292 }
285293
286294 /**
@@ -650,24 +658,18 @@ public int deleteAllOffsets(
650658 // Delete all the pending transactional offsets too. Here we only write a tombstone
651659 // if the topic-partition was not in the main storage because we don't need to write
652660 // two consecutive tombstones.
653- TimelineHashSet <Long > openTransactions = openTransactionsByGroup .get (groupId );
654- if (openTransactions != null ) {
655- openTransactions .forEach (producerId -> {
656- Offsets pendingOffsets = pendingTransactionalOffsets .get (producerId );
657- if (pendingOffsets != null ) {
658- TimelineHashMap <String , TimelineHashMap <Integer , OffsetAndMetadata >> pendingGroupOffsets =
659- pendingOffsets .offsetsByGroup .get (groupId );
660- if (pendingGroupOffsets != null ) {
661- pendingGroupOffsets .forEach ((topic , offsetsByPartition ) -> {
662- offsetsByPartition .keySet ().forEach (partition -> {
663- if (!hasCommittedOffset (groupId , topic , partition )) {
664- records .add (GroupCoordinatorRecordHelpers .newOffsetCommitTombstoneRecord (groupId , topic , partition ));
665- numDeletedOffsets .getAndIncrement ();
666- }
667- });
668- });
669- }
670- }
661+ TimelineHashMap <String , TimelineHashMap <Integer , TimelineHashSet <Long >>> openTransactionsByTopic =
662+ openTransactionsByGroupTopicAndPartition .get (groupId );
663+ if (openTransactionsByTopic != null ) {
664+ openTransactionsByTopic .forEach ((topic , openTransactionsByPartition ) -> {
665+ openTransactionsByPartition .forEach ((partition , producerIds ) -> {
666+ producerIds .forEach (producerId -> {
667+ if (!hasCommittedOffset (groupId , topic , partition )) {
668+ records .add (GroupCoordinatorRecordHelpers .newOffsetCommitTombstoneRecord (groupId , topic , partition ));
669+ numDeletedOffsets .getAndIncrement ();
670+ }
671+ });
672+ });
671673 });
672674 }
673675
@@ -685,17 +687,15 @@ boolean hasPendingTransactionalOffsets(
685687 String topic ,
686688 int partition
687689 ) {
688- final TimelineHashSet <Long > openTransactions = openTransactionsByGroup .get (groupId );
689- if (openTransactions == null ) return false ;
690+ TimelineHashMap <String , TimelineHashMap <Integer , TimelineHashSet <Long >>> openTransactionsByTopic =
691+ openTransactionsByGroupTopicAndPartition .get (groupId );
692+ if (openTransactionsByTopic == null ) return false ;
690693
691- for (Long producerId : openTransactions ) {
692- Offsets offsets = pendingTransactionalOffsets .get (producerId );
693- if (offsets != null && offsets .get (groupId , topic , partition ) != null ) {
694- return true ;
695- }
696- }
694+ TimelineHashMap <Integer , TimelineHashSet <Long >> openTransactionsByPartition = openTransactionsByTopic .get (topic );
695+ if (openTransactionsByPartition == null ) return false ;
697696
698- return false ;
697+ TimelineHashSet <Long > openTransactions = openTransactionsByPartition .get (partition );
698+ return openTransactions != null && !openTransactions .isEmpty ();
699699 }
700700
701701 /**
@@ -1005,21 +1005,41 @@ public void replay(
10051005 openTransactionsByGroup
10061006 .computeIfAbsent (groupId , __ -> new TimelineHashSet <>(snapshotRegistry , 1 ))
10071007 .add (producerId );
1008+ openTransactionsByGroupTopicAndPartition
1009+ .computeIfAbsent (groupId , __ -> new TimelineHashMap <>(snapshotRegistry , 1 ))
1010+ .computeIfAbsent (topic , __ -> new TimelineHashMap <>(snapshotRegistry , 1 ))
1011+ .computeIfAbsent (partition , __ -> new TimelineHashSet <>(snapshotRegistry , 1 ))
1012+ .add (producerId );
10081013 }
10091014 } else {
10101015 if (offsets .remove (groupId , topic , partition ) != null ) {
10111016 metrics .decrementNumOffsets ();
10121017 }
10131018
10141019 // Remove all the pending offset commits related to the tombstone.
1015- TimelineHashSet <Long > openTransactions = openTransactionsByGroup .get (groupId );
1016- if (openTransactions != null ) {
1017- openTransactions .forEach (openProducerId -> {
1018- Offsets pendingOffsets = pendingTransactionalOffsets .get (openProducerId );
1019- if (pendingOffsets != null ) {
1020- pendingOffsets .remove (groupId , topic , partition );
1020+ TimelineHashMap <String , TimelineHashMap <Integer , TimelineHashSet <Long >>> openTransactionsByTopic =
1021+ openTransactionsByGroupTopicAndPartition .get (groupId );
1022+ if (openTransactionsByTopic != null ) {
1023+ TimelineHashMap <Integer , TimelineHashSet <Long >> openTransactionsByPartition = openTransactionsByTopic .get (topic );
1024+ if (openTransactionsByPartition != null ) {
1025+ TimelineHashSet <Long > openTransactions = openTransactionsByPartition .get (partition );
1026+ if (openTransactions != null ) {
1027+ openTransactions .forEach (openProducerId -> {
1028+ Offsets pendingOffsets = pendingTransactionalOffsets .get (openProducerId );
1029+ if (pendingOffsets != null ) {
1030+ pendingOffsets .remove (groupId , topic , partition );
1031+ }
1032+ });
1033+
1034+ openTransactionsByPartition .remove (partition );
1035+ if (openTransactionsByPartition .isEmpty ()) {
1036+ openTransactionsByTopic .remove (topic );
1037+ }
1038+ if (openTransactionsByTopic .isEmpty ()) {
1039+ openTransactionsByGroupTopicAndPartition .remove (groupId );
1040+ }
10211041 }
1022- });
1042+ }
10231043 }
10241044 }
10251045 }
@@ -1031,6 +1051,7 @@ public void replay(
10311051 * @param result The result of the transaction.
10321052 * @throws RuntimeException if the transaction can not be completed.
10331053 */
1054+ @ SuppressWarnings ("NPathComplexity" )
10341055 public void replayEndTransactionMarker (
10351056 long producerId ,
10361057 TransactionResult result
@@ -1043,14 +1064,39 @@ public void replayEndTransactionMarker(
10431064 return ;
10441065 }
10451066
1046- pendingOffsets .offsetsByGroup .keySet (). forEach (groupId -> {
1047- TimelineHashSet <Long > openTransactions = openTransactionsByGroup .get (groupId );
1048- if (openTransactions != null ) {
1049- openTransactions .remove (producerId );
1050- if (openTransactions .isEmpty ()) {
1067+ pendingOffsets .offsetsByGroup .forEach (( groupId , topicOffsets ) -> {
1068+ TimelineHashSet <Long > groupTransactions = openTransactionsByGroup .get (groupId );
1069+ if (groupTransactions != null ) {
1070+ groupTransactions .remove (producerId );
1071+ if (groupTransactions .isEmpty ()) {
10511072 openTransactionsByGroup .remove (groupId );
10521073 }
10531074 }
1075+
1076+ TimelineHashMap <String , TimelineHashMap <Integer , TimelineHashSet <Long >>> openTransactionsByTopic =
1077+ openTransactionsByGroupTopicAndPartition .get (groupId );
1078+ if (openTransactionsByTopic == null ) return ;
1079+
1080+ topicOffsets .forEach ((topic , partitionOffsets ) -> {
1081+ TimelineHashMap <Integer , TimelineHashSet <Long >> openTransactionsByPartition = openTransactionsByTopic .get (topic );
1082+ if (openTransactionsByPartition == null ) return ;
1083+
1084+ partitionOffsets .keySet ().forEach (partitionId -> {
1085+ TimelineHashSet <Long > partitionTransactions = openTransactionsByPartition .get (partitionId );
1086+ if (partitionTransactions != null ) {
1087+ partitionTransactions .remove (producerId );
1088+ if (partitionTransactions .isEmpty ()) {
1089+ openTransactionsByPartition .remove (partitionId );
1090+ }
1091+ if (openTransactionsByPartition .isEmpty ()) {
1092+ openTransactionsByTopic .remove (topic );
1093+ }
1094+ if (openTransactionsByTopic .isEmpty ()) {
1095+ openTransactionsByGroupTopicAndPartition .remove (groupId );
1096+ }
1097+ }
1098+ });
1099+ });
10541100 });
10551101
10561102 if (result == TransactionResult .COMMIT ) {
0 commit comments