77import org .apache .kafka .common .errors .RetriableException ;
88
99import java .time .Duration ;
10- import java .util .HashMap ;
11- import java .util .HashSet ;
12- import java .util .Map ;
13- import java .util .Set ;
10+ import java .util .*;
1411
15- import static java .util .Comparator .comparing ;
16- import static java .util .function .BinaryOperator .maxBy ;
1712import static java .util .stream .Collectors .toSet ;
1813
1914abstract class AbstractCommitPolicy <K , V > implements CommitPolicy <K , V > {
@@ -49,81 +44,118 @@ void onError(RetriableException e) {
4944 }
5045 }
5146
52- final Map <TopicPartition , Long > topicOffsetHighWaterMark ;
53- final Map <TopicPartition , OffsetAndMetadata > completedTopicOffsets ;
5447 protected final Consumer <K , V > consumer ;
48+ private final Map <TopicPartition , Long > topicOffsetHighWaterMark ;
49+ private final Map <TopicPartition , CompletedOffsets > completedOffsets ;
5550 private final long syncCommitRetryIntervalMs ;
5651 private final int maxAttemptsForEachSyncCommit ;
5752
5853 AbstractCommitPolicy (Consumer <K , V > consumer , Duration syncCommitRetryInterval , int maxAttemptsForEachSyncCommit ) {
5954 this .consumer = consumer ;
6055 this .topicOffsetHighWaterMark = new HashMap <>();
61- this .completedTopicOffsets = new HashMap <>();
56+ this .completedOffsets = new HashMap <>();
6257 this .syncCommitRetryIntervalMs = syncCommitRetryInterval .toMillis ();
6358 this .maxAttemptsForEachSyncCommit = maxAttemptsForEachSyncCommit ;
6459 }
6560
6661 @ Override
6762 public void markPendingRecord (ConsumerRecord <K , V > record ) {
63+ final TopicPartition topicPartition = new TopicPartition (record .topic (), record .partition ());
6864 topicOffsetHighWaterMark .merge (
69- new TopicPartition ( record . topic (), record . partition ()) ,
65+ topicPartition ,
7066 record .offset () + 1 ,
7167 Math ::max );
68+
69+ final CompletedOffsets offset = completedOffsets .get (topicPartition );
70+ // please note that if offset exists, it could happen for record.offset() >= offset.nextOffsetToCommit()
71+ // when there're duplicate records which have lower offset than our next offset to commit consumed from broker
72+ if (offset == null ) {
73+ completedOffsets .put (topicPartition , new CompletedOffsets (record .offset () - 1L ));
74+ }
7275 }
7376
7477 @ Override
7578 public void markCompletedRecord (ConsumerRecord <K , V > record ) {
76- completedTopicOffsets .merge (
77- new TopicPartition (record .topic (), record .partition ()),
78- new OffsetAndMetadata (record .offset () + 1L ),
79- maxBy (comparing (OffsetAndMetadata ::offset )));
79+ final CompletedOffsets offset = completedOffsets .get (new TopicPartition (record .topic (), record .partition ()));
80+ // offset could be null, when the partition of the record was revoked before its processing was done
81+ if (offset != null ) {
82+ offset .addCompleteOffset (record .offset ());
83+ }
8084 }
8185
8286 @ Override
83- public Set <TopicPartition > syncPartialCommit () {
84- commitSync (completedTopicOffsets );
85- final Set <TopicPartition > partitions = checkCompletedPartitions ();
86- completedTopicOffsets .clear ();
87- for (TopicPartition p : partitions ) {
88- topicOffsetHighWaterMark .remove (p );
89- }
90- return partitions ;
87+ public void revokePartitions (Collection <TopicPartition > partitions ) {
88+ clearProcessingRecordStatesFor (partitions );
9189 }
9290
93- Set <TopicPartition > getCompletedPartitions (boolean noPendingRecords ) {
94- final Set <TopicPartition > partitions ;
95- if (noPendingRecords ) {
96- assert checkCompletedPartitions ().equals (topicOffsetHighWaterMark .keySet ())
97- : "expect: " + checkCompletedPartitions () + " actual: " + topicOffsetHighWaterMark .keySet ();
98- partitions = new HashSet <>(topicOffsetHighWaterMark .keySet ());
99- } else {
100- partitions = checkCompletedPartitions ();
91+ @ Override
92+ public Set <TopicPartition > partialCommitSync () {
93+ final Map <TopicPartition , OffsetAndMetadata > offsetsToCommit = completedTopicOffsetsToCommit ();
94+ if (offsetsToCommit .isEmpty ()) {
95+ return Collections .emptySet ();
10196 }
102- return partitions ;
97+ commitSyncWithRetry (offsetsToCommit );
98+ updatePartialCommittedOffsets (offsetsToCommit );
99+
100+ return clearProcessingRecordStatesForCompletedPartitions (offsetsToCommit );
103101 }
104102
105- void clearCachedCompletedPartitionsRecords (Set <TopicPartition > completedPartitions , boolean noPendingRecords ) {
106- completedTopicOffsets .clear ();
107- if (noPendingRecords ) {
108- topicOffsetHighWaterMark .clear ();
109- } else {
110- for (TopicPartition p : completedPartitions ) {
111- topicOffsetHighWaterMark .remove (p );
112- }
113- }
103+ Set <TopicPartition > fullCommitSync () {
104+ commitSyncWithRetry ();
105+
106+ final Set <TopicPartition > completePartitions = partitionsForAllRecordsStates ();
107+ clearAllProcessingRecordStates ();
108+ return completePartitions ;
114109 }
115110
116111 @ VisibleForTesting
117112 Map <TopicPartition , Long > topicOffsetHighWaterMark () {
118113 return topicOffsetHighWaterMark ;
119114 }
120115
121- @ VisibleForTesting
122- Map <TopicPartition , OffsetAndMetadata > completedTopicOffsets () {
123- return completedTopicOffsets ;
116+ Map <TopicPartition , OffsetAndMetadata > completedTopicOffsetsToCommit () {
117+ if (noCompletedOffsets ()) {
118+ return Collections .emptyMap ();
119+ }
120+
121+ final Map <TopicPartition , OffsetAndMetadata > offsets = new HashMap <>();
122+ for (Map .Entry <TopicPartition , CompletedOffsets > entry : completedOffsets .entrySet ()) {
123+ final CompletedOffsets offset = entry .getValue ();
124+ if (offset .hasOffsetToCommit ()) {
125+ offsets .put (entry .getKey (), offset .getOffsetToCommit ());
126+ }
127+ }
128+
129+ return offsets ;
130+ }
131+
132+ boolean noTopicOffsetsToCommit () {
133+ if (noCompletedOffsets ()) {
134+ return true ;
135+ }
136+
137+ for (Map .Entry <TopicPartition , CompletedOffsets > entry : completedOffsets .entrySet ()) {
138+ final CompletedOffsets offset = entry .getValue ();
139+ if (offset .hasOffsetToCommit ()) {
140+ return false ;
141+ }
142+ }
143+
144+ return true ;
145+ }
146+
147+ void updatePartialCommittedOffsets (Map <TopicPartition , OffsetAndMetadata > offsets ) {
148+ for (Map .Entry <TopicPartition , OffsetAndMetadata > entry : offsets .entrySet ()) {
149+ final CompletedOffsets offset = completedOffsets .get (entry .getKey ());
150+ offset .updateCommittedOffset (entry .getValue ().offset ());
151+ }
152+ }
153+
154+ boolean noCompletedOffsets () {
155+ return completedOffsets .isEmpty ();
124156 }
125157
126- void commitSync () {
158+ void commitSyncWithRetry () {
127159 final RetryContext context = context ();
128160 do {
129161 try {
@@ -135,7 +167,7 @@ void commitSync() {
135167 } while (true );
136168 }
137169
138- void commitSync (Map <TopicPartition , OffsetAndMetadata > offsets ) {
170+ void commitSyncWithRetry (Map <TopicPartition , OffsetAndMetadata > offsets ) {
139171 final RetryContext context = context ();
140172 do {
141173 try {
@@ -147,8 +179,34 @@ void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
147179 } while (true );
148180 }
149181
150- private Set <TopicPartition > checkCompletedPartitions () {
151- return completedTopicOffsets
182+ Set <TopicPartition > partitionsForAllRecordsStates () {
183+ return new HashSet <>(topicOffsetHighWaterMark .keySet ());
184+ }
185+
186+ void clearAllProcessingRecordStates () {
187+ topicOffsetHighWaterMark .clear ();
188+ completedOffsets .clear ();
189+ }
190+
191+ Set <TopicPartition > clearProcessingRecordStatesForCompletedPartitions (Map <TopicPartition , OffsetAndMetadata > committedOffsets ) {
192+ final Set <TopicPartition > partitions = partitionsToSafeResume (committedOffsets );
193+ clearProcessingRecordStatesFor (partitions );
194+ return partitions ;
195+ }
196+
197+ void clearProcessingRecordStatesFor (Collection <TopicPartition > partitions ) {
198+ for (TopicPartition p : partitions ) {
199+ topicOffsetHighWaterMark .remove (p );
200+ completedOffsets .remove (p );
201+ }
202+ }
203+
204+ Set <TopicPartition > partitionsToSafeResume () {
205+ return partitionsToSafeResume (completedTopicOffsetsToCommit ());
206+ }
207+
208+ Set <TopicPartition > partitionsToSafeResume (Map <TopicPartition , OffsetAndMetadata > completedOffsets ) {
209+ return completedOffsets
152210 .entrySet ()
153211 .stream ()
154212 .filter (entry -> topicOffsetMeetHighWaterMark (entry .getKey (), entry .getValue ()))
0 commit comments